Re: Make COPY format extendable: Extract COPY TO format implementations - Mailing list pgsql-hackers

From Sutou Kouhei
Subject Re: Make COPY format extendable: Extract COPY TO format implementations
Date
Msg-id 20250131.004213.996398325029997587.kou@clear-code.com
Whole thread Raw
In response to Re: Make COPY format extendable: Extract COPY TO format implementations  (Masahiko Sawada <sawada.mshk@gmail.com>)
List pgsql-hackers
Hi,

In <CAD21AoDyBJrCsh5vNFWcRmS0_XKCCCP4gLzZnLCayYccLpaBfw@mail.gmail.com>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Tue, 28 Jan 2025 15:00:03 -0800,
  Masahiko Sawada <sawada.mshk@gmail.com> wrote:

> While 0001 and 0002 look good to me overall, we still need to polish
> subsequent patches. Here are review comments:

I attached the v29 patch set that applied your suggestions:

Refactoring:
0001-0002: There are some trivial changes (copyright year
           change and some comment fixes)

COPY TO related:
0003: Applied your copyto_internal.h related,
      CopyToGetRoutine() related and built-in CopyToRoutine
      suggestions
0004: Applied your copyto_internal.h related suggestion
0005: No change

COPY FROM related:
0006: Applied your copyfrom_internal.h related,
      CopyFromGetRoutine() related and built-in CopyFromRoutine
      suggestions
0007: Applied your copyfrom_internal.h related suggestion
0008: Applied your CopyFromStateRead() related suggestion
0009: No change


> I still find that it would not be a good idea to move all copy-related
> struct definitions to copyapi.h because we need to include copyapi.h
> file into a .c file even if the file is not related to the custom copy
> format routines. I think that copyapi.h should have only the
> definitions of CopyToRoutine and CopyFromRoutine as well as some
> functions related to the custom copy format. Here is an idea:
> 
> - CopyToState and CopyFromState are defined in copyto_internal.h (new
> file) and copyfrom_internal.h, respectively.
> - These two files #include's copy.h and other necessary header files.
> - copyapi.h has only CopyToRoutine and CopyFromRoutine and #include's
> both copyfrom_internal.h and copyto_internal.h.
> - copyto.c, copyfrom.c and copyfromparse.c #include copyapi.h
> 
> Some advantages of this idea:
> 
> - we can keep both CopyToState and CopyFromState private in _internal.h files.
> - custom format extension can include copyapi.h to provide a custom
> copy format routine and to access the copy state data.
> - copy-related .c files won't need to include copyapi.h if they don't
> use custom copy format routines.

Hmm. I thought Copy{To,From}State are "public" API not
"private" API for extensions. Because extensions need to use
at least Copy{To,From}State::opaque directly. If we want to
make Copy{To,From}State private, I think that we should
provide getter/setter for needed members of
Copy{To,From}State such as
Copy{To,From}State{Get,Set}Opaque().

It's a design in the v2 patch set:
https://www.postgresql.org/message-id/20231221.183504.1240642084042888377.kou%40clear-code.com

We discussed that we can make CopyToState public:
https://www.postgresql.org/message-id/CAD21AoD%3DUapH4Wh06G6H5XAzPJ0iJg9YcW8r7E2UEJkZ8QsosA%40mail.gmail.com

What does "private" mean here? I thought that it means that
"PostgreSQL itself can use it". But it seems that you mean
that "PostgreSQL itself and custom format extensions can use
it but other extensions can't use it".

I'm not familiar with "_internal.h" in PostgreSQL but is
"_internal.h" for the latter "private" mean?


> The 0008 patch introduces CopyFromStateRead(). While it would be a
> good start, I think we can consider sorting out low-level
> communication functions more. For example, CopyReadBinaryData() uses
> the internal 64kB buffer but some custom copy format extensions might
> want to use a larger buffer in its own implementation, which would
> require exposing CopyGetData() etc. Given that we might expose more
> functions to provide more ways for extensions, we might want to rename
> CopyFromStateRead().

This suggests that we just need a low-level CopyGetData()
not a high-level CopyReadBinaryData() as the first step,
right?

I agree that we should start from a minimal API set.

I've renamed CopyFromStateRead() to CopyFromStateGetData()
because it wraps CopyGetData() now.


> While we get the format routines for custom formats in
> ProcessCopyOptionFormat(), we do that for built-in formats in
> BeginCopyTo(), which seems odd to me. I think we can have
> CopyToGetRoutine() responsible for getting CopyToRoutine for built-in
> formats as well as custom format. The same is true for
> CopyFromRoutine.

I like the current design because we don't need to export
CopyToGetBuiltinRoutine() (we can use static for
CopyToGetBuiltinRoutine()) but I applied your
suggestion. Because it's not a strong opinion.


> Copy[To|From]Routine for built-in formats are missing to set the node type.

Oh, sorry. I missed this.


Thanks,
-- 
kou

From eef8c0bc18a489fea352db242dd9e16003132243 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sat, 28 Sep 2024 23:24:49 +0900
Subject: [PATCH v29 1/9] Refactor COPY TO to use format callback functions.

This commit introduces a new CopyToRoutine struct, which is a set of
callback routines to copy tuples in a specific format. It also makes
the existing formats (text, CSV, and binary) utilize these format
callbacks.

This change is a preliminary step towards making the COPY TO command
extensible in terms of output formats.

Additionally, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei
Reviewed-by: Michael Paquier, Tomas Vondra, Masahiko Sawada
Reviewed-by: Junwang Zhao
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
---
 src/backend/commands/copyto.c    | 441 +++++++++++++++++++++----------
 src/include/commands/copyapi.h   |  57 ++++
 src/tools/pgindent/typedefs.list |   1 +
 3 files changed, 358 insertions(+), 141 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 99cb23cb347..26c67ddc351 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -19,7 +19,7 @@
 #include <sys/stat.h>
 
 #include "access/tableam.h"
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
@@ -64,6 +64,9 @@ typedef enum CopyDest
  */
 typedef struct CopyToStateData
 {
+    /* format-specific routines */
+    const CopyToRoutine *routine;
+
     /* low-level state data */
     CopyDest    copy_dest;        /* type of copy source/destination */
     FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
@@ -114,6 +117,19 @@ static void CopyAttributeOutText(CopyToState cstate, const char *string);
 static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
                                 bool use_quote);
 
+/* built-in format-specific routines */
+static void CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc);
+static void CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
+static void CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot);
+static void CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot);
+static void CopyToTextLikeOneRow(CopyToState cstate, TupleTableSlot *slot,
+                                 bool is_csv);
+static void CopyToTextLikeEnd(CopyToState cstate);
+static void CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc);
+static void CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo);
+static void CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
+static void CopyToBinaryEnd(CopyToState cstate);
+
 /* Low-level communications functions */
 static void SendCopyBegin(CopyToState cstate);
 static void SendCopyEnd(CopyToState cstate);
@@ -121,9 +137,254 @@ static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
 static void CopySendString(CopyToState cstate, const char *str);
 static void CopySendChar(CopyToState cstate, char c);
 static void CopySendEndOfRow(CopyToState cstate);
+static void CopySendTextLikeEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * COPY TO routines for built-in formats.
+ *
+ * CSV and text formats share the same TextLike routines except for the
+ * one-row callback.
+ */
+
+/* text format */
+static const CopyToRoutine CopyToRoutineText = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+/* CSV format */
+static const CopyToRoutine CopyToRoutineCSV = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToCSVOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+/* binary format */
+static const CopyToRoutine CopyToRoutineBinary = {
+    .CopyToStart = CopyToBinaryStart,
+    .CopyToOutFunc = CopyToBinaryOutFunc,
+    .CopyToOneRow = CopyToBinaryOneRow,
+    .CopyToEnd = CopyToBinaryEnd,
+};
+
+/* Return a COPY TO routine for the given options */
+static const CopyToRoutine *
+CopyToGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyToRoutineCSV;
+    else if (opts.binary)
+        return &CopyToRoutineBinary;
+
+    /* default is text */
+    return &CopyToRoutineText;
+}
+
+/* Implementation of the start callback for text and CSV formats */
+static void
+CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    /*
+     * For non-binary copy, we need to convert null_print to file encoding,
+     * because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        ListCell   *cur;
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, colname, false);
+            else
+                CopyAttributeOutText(cstate, colname);
+        }
+
+        CopySendTextLikeEndOfRow(cstate);
+    }
+}
+
+/*
+ * Implementation of the outfunc callback for text and CSV formats. Assign
+ * the output function data to the given *finfo.
+ */
+static void
+CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the per-row callback for text format */
+static void
+CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, false);
+}
+
+/* Implementation of the per-row callback for CSV format */
+static void
+CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, true);
+}
+
+/*
+ * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
+ *
+ * We use pg_attribute_always_inline to reduce function call overheads.
+ */
+static pg_attribute_always_inline void
+CopyToTextLikeOneRow(CopyToState cstate,
+                     TupleTableSlot *slot,
+                     bool is_csv)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+
+    foreach_int(attnum, cstate->attnumlist)
+    {
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+        {
+            CopySendString(cstate, cstate->opts.null_print_client);
+        }
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1],
+                                        value);
+
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
+                CopyAttributeOutCSV(cstate, string,
+                                    cstate->opts.force_quote_flags[attnum - 1]);
+            else
+                CopyAttributeOutText(cstate, string);
+        }
+    }
+
+    CopySendTextLikeEndOfRow(cstate);
+}
+
+/* Implementation of the end callback for text and CSV formats */
+static void
+CopyToTextLikeEnd(CopyToState cstate)
+{
+    /* Nothing to do here */
+}
+
+/*
+ * Implementation of the start callback for binary format. Send a header
+ * for a binary copy.
+ */
+static void
+CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    int32        tmp;
+
+    /* Signature */
+    CopySendData(cstate, BinarySignature, 11);
+    /* Flags field */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+    /* No header extension */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+}
+
+/*
+ * Implementation of the outfunc callback for binary format. Assign
+ * the binary output function to the given *finfo.
+ */
+static void
+CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the per-row callback for binary format */
+static void
+CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach_int(attnum, cstate->attnumlist)
+    {
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+        {
+            CopySendInt32(cstate, -1);
+        }
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1],
+                                           value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+/* Implementation of the end callback for binary format */
+static void
+CopyToBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -191,16 +452,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -235,10 +486,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -254,6 +501,35 @@ CopySendEndOfRow(CopyToState cstate)
     resetStringInfo(fe_msgbuf);
 }
 
+/*
+ * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the
+ * the line termination and do common appropriate things for the end of row.
+ */
+static inline void
+CopySendTextLikeEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:
+            /* Default line termination depends on platform */
+#ifndef WIN32
+            CopySendChar(cstate, '\n');
+#else
+            CopySendString(cstate, "\r\n");
+#endif
+            break;
+        case COPY_FRONTEND:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            CopySendChar(cstate, '\n');
+            break;
+        default:
+            break;
+    }
+
+    /* Now take the actions related to the end of a row */
+    CopySendEndOfRow(cstate);
+}
+
 /*
  * These functions do apply some data conversion
  */
@@ -426,6 +702,9 @@ BeginCopyTo(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
 
+    /* Set format routine */
+    cstate->routine = CopyToGetRoutine(cstate->opts);
+
     /* Process the source/target relation or query */
     if (rel)
     {
@@ -771,19 +1050,10 @@ DoCopyTo(CopyToState cstate)
     foreach(cur, cstate->attnumlist)
     {
         int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
-        if (cstate->opts.binary)
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-        else
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                       &cstate->out_functions[attnum - 1]);
     }
 
     /*
@@ -796,56 +1066,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->routine->CopyToStart(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -884,13 +1105,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
+    cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -903,74 +1118,18 @@ DoCopyTo(CopyToState cstate)
 /*
  * Emit one row during DoCopyTo().
  */
-static void
+static inline void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
 
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    if (!cstate->opts.binary)
-    {
-        bool        need_delim = false;
-
-        foreach_int(attnum, cstate->attnumlist)
-        {
-            Datum        value = slot->tts_values[attnum - 1];
-            bool        isnull = slot->tts_isnull[attnum - 1];
-            char       *string;
-
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-
-            if (isnull)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1]);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-        }
-    }
-    else
-    {
-        foreach_int(attnum, cstate->attnumlist)
-        {
-            Datum        value = slot->tts_values[attnum - 1];
-            bool        isnull = slot->tts_isnull[attnum - 1];
-            bytea       *outputbytes;
-
-            if (isnull)
-                CopySendInt32(cstate, -1);
-            else
-            {
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->routine->CopyToOneRow(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 00000000000..be29e3fbdef
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,57 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "commands/copy.h"
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+
+/*
+ * API structure for a COPY TO format implementation. Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyToRoutine
+{
+    /*
+     * Set output function information. This callback is called once at the
+     * beginning of COPY TO.
+     *
+     * 'finfo' can be optionally filled to provide the catalog information of
+     * the output function.
+     *
+     * 'atttypid' is the OID of data type used by the relation's attribute.
+     */
+    void        (*CopyToOutFunc) (CopyToState cstate, Oid atttypid,
+                                  FmgrInfo *finfo);
+
+    /*
+     * Start a COPY TO. This callback is called once at the beginning of COPY
+     * FROM.
+     *
+     * 'tupDesc' is the tuple descriptor of the relation from where the data
+     * is read.
+     */
+    void        (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
+
+    /*
+     * Write one row to the 'slot'.
+     */
+    void        (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* End a COPY TO. This callback is called once at the end of COPY FROM */
+    void        (*CopyToEnd) (CopyToState cstate);
+} CopyToRoutine;
+
+#endif                            /* COPYAPI_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a2644a2e653..1cbb3628857 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -508,6 +508,7 @@ CopyMultiInsertInfo
 CopyOnErrorChoice
 CopySource
 CopyStmt
+CopyToRoutine
 CopyToState
 CopyToStateData
 Cost
-- 
2.47.1

From a4e1392e26f96a645bb327119838830c553a7c69 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 18 Nov 2024 16:32:43 -0800
Subject: [PATCH v29 2/9] Refactor COPY FROM to use format callback functions.

This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.

This change is a preliminary step towards making the COPY TO command
extensible in terms of output formats.

Similar to XXXX, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei
Reviewed-by: Michael Paquier, Tomas Vondra, Masahiko Sawada
Reviewed-by: Junwang Zhao
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
---
 contrib/file_fdw/file_fdw.c              |   1 -
 src/backend/commands/copyfrom.c          | 190 +++++++--
 src/backend/commands/copyfromparse.c     | 504 +++++++++++++----------
 src/include/commands/copy.h              |   2 -
 src/include/commands/copyapi.h           |  48 ++-
 src/include/commands/copyfrom_internal.h |  13 +-
 src/tools/pgindent/typedefs.list         |   1 +
 7 files changed, 492 insertions(+), 267 deletions(-)

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 678e754b2b9..323c43dca4a 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -21,7 +21,6 @@
 #include "access/table.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_foreign_table.h"
-#include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 0cbd05f5602..917fa6605ef 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+/*
+ * Built-in format-specific routines. One-row callbacks are defined in
+ * copyfromparse.c
+ */
+static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+                                   Oid *typioparam);
+static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromTextLikeEnd(CopyFromState cstate);
+static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                                 FmgrInfo *finfo, Oid *typioparam);
+static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromBinaryEnd(CopyFromState cstate);
+
+
+/*
+ * COPY FROM routines for built-in formats.
+ *
+ * CSV and text formats share the same TextLike routines except for the
+ * one-row callback.
+ */
+
+/* text format */
+static const CopyFromRoutine CopyFromRoutineText = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* CSV format */
+static const CopyFromRoutine CopyFromRoutineCSV = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromCSVOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* binary format */
+static const CopyFromRoutine CopyFromRoutineBinary = {
+    .CopyFromInFunc = CopyFromBinaryInFunc,
+    .CopyFromStart = CopyFromBinaryStart,
+    .CopyFromOneRow = CopyFromBinaryOneRow,
+    .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/* Return a COPY FROM routine for the given options */
+static const CopyFromRoutine *
+CopyFromGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyFromRoutineCSV;
+    else if (opts.binary)
+        return &CopyFromRoutineBinary;
+
+    /* default is text */
+    return &CopyFromRoutineText;
+}
+
+/* Implementation of the start callback for text and CSV formats */
+static void
+CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    attr_count;
+
+    /*
+     * If encoding conversion is needed, we need another buffer to hold the
+     * converted input data.  Otherwise, we can just point input_buf to the
+     * same buffer as raw_buf.
+     */
+    if (cstate->need_transcoding)
+    {
+        cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+        cstate->input_buf_index = cstate->input_buf_len = 0;
+    }
+    else
+        cstate->input_buf = cstate->raw_buf;
+    cstate->input_reached_eof = false;
+
+    initStringInfo(&cstate->line_buf);
+
+    /*
+     * Create workspace for CopyReadAttributes results; used by CSV and text
+     * format.
+     */
+    attr_count = list_length(cstate->attnumlist);
+    cstate->max_fields = attr_count;
+    cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+/*
+ * Implementation of the infunc callback for text and CSV formats. Assign
+ * the input function data to the given *finfo.
+ */
+static void
+CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+                       Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for text and CSV formats */
+static void
+CopyFromTextLikeEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
+/* Implementation of the start callback for binary format */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    /* Read and verify binary header */
+    ReceiveCopyBinaryHeader(cstate);
+}
+
+/*
+ * Implementation of the infunc callback for binary format. Assign
+ * the binary input function to the given *finfo.
+ */
+static void
+CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                     FmgrInfo *finfo, Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for binary format */
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
 /*
  * error context callback for COPY FROM
  *
@@ -1396,7 +1535,6 @@ BeginCopyFrom(ParseState *pstate,
                 num_defaults;
     FmgrInfo   *in_functions;
     Oid           *typioparams;
-    Oid            in_func_oid;
     int           *defmap;
     ExprState **defexprs;
     MemoryContext oldcontext;
@@ -1428,6 +1566,9 @@ BeginCopyFrom(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
+    /* Set the format routine */
+    cstate->routine = CopyFromGetRoutine(cstate->opts);
+
     /* Process the target relation */
     cstate->rel = rel;
 
@@ -1583,25 +1724,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->raw_buf_index = cstate->raw_buf_len = 0;
     cstate->raw_reached_eof = false;
 
-    if (!cstate->opts.binary)
-    {
-        /*
-         * If encoding conversion is needed, we need another buffer to hold
-         * the converted input data.  Otherwise, we can just point input_buf
-         * to the same buffer as raw_buf.
-         */
-        if (cstate->need_transcoding)
-        {
-            cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-            cstate->input_buf_index = cstate->input_buf_len = 0;
-        }
-        else
-            cstate->input_buf = cstate->raw_buf;
-        cstate->input_reached_eof = false;
-
-        initStringInfo(&cstate->line_buf);
-    }
-
     initStringInfo(&cstate->attribute_buf);
 
     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1634,13 +1756,9 @@ BeginCopyFrom(ParseState *pstate,
             continue;
 
         /* Fetch the input function and typioparam info */
-        if (cstate->opts.binary)
-            getTypeBinaryInputInfo(att->atttypid,
-                                   &in_func_oid, &typioparams[attnum - 1]);
-        else
-            getTypeInputInfo(att->atttypid,
-                             &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                        &in_functions[attnum - 1],
+                                        &typioparams[attnum - 1]);
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1775,20 +1893,7 @@ BeginCopyFrom(ParseState *pstate,
 
     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-    if (cstate->opts.binary)
-    {
-        /* Read and verify binary header */
-        ReceiveCopyBinaryHeader(cstate);
-    }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
-    {
-        AttrNumber    attr_count = list_length(cstate->attnumlist);
-
-        cstate->max_fields = attr_count;
-        cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-    }
+    cstate->routine->CopyFromStart(cstate, tupDesc);
 
     MemoryContextSwitchTo(oldcontext);
 
@@ -1801,6 +1906,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    /* Invoke the end callback */
+    cstate->routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index caccdc8563c..65f20d332ee 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -62,7 +62,6 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
-#include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
@@ -140,8 +139,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static bool CopyReadLine(CopyFromState cstate);
-static bool CopyReadLineText(CopyFromState cstate);
+static bool CopyReadLine(CopyFromState cstate, bool is_csv);
+static bool CopyReadLineText(CopyFromState cstate, bool is_csv);
 static int    CopyReadAttributesText(CopyFromState cstate);
 static int    CopyReadAttributesCSV(CopyFromState cstate);
 static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
@@ -740,9 +739,11 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  * in the relation.
  *
  * NOTE: force_not_null option are not applied to the returned fields.
+ *
+ * We use pg_attribute_always_inline to reduce function call overheads.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static pg_attribute_always_inline bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
 {
     int            fldct;
     bool        done;
@@ -759,13 +760,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
         tupDesc = RelationGetDescr(cstate->rel);
 
         cstate->cur_lineno++;
-        done = CopyReadLine(cstate);
+        done = CopyReadLine(cstate, is_csv);
 
         if (cstate->opts.header_line == COPY_HEADER_MATCH)
         {
             int            fldnum;
 
-            if (cstate->opts.csv_mode)
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
                 fldct = CopyReadAttributesCSV(cstate);
             else
                 fldct = CopyReadAttributesText(cstate);
@@ -809,7 +814,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     cstate->cur_lineno++;
 
     /* Actually read the line into memory here */
-    done = CopyReadLine(cstate);
+    done = CopyReadLine(cstate, is_csv);
 
     /*
      * EOF at start of line means we're done.  If we see EOF after some
@@ -819,8 +824,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     if (done && cstate->line_buf.len == 0)
         return false;
 
-    /* Parse the line into de-escaped field values */
-    if (cstate->opts.csv_mode)
+    /*
+     * Parse the line into de-escaped field values
+     *
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
         fldct = CopyReadAttributesCSV(cstate);
     else
         fldct = CopyReadAttributesText(cstate);
@@ -830,6 +840,244 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     return true;
 }
 
+/*
+ * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
+ *
+ * We use pg_attribute_always_inline to reduce function call overheads.
+ */
+static pg_attribute_always_inline bool
+CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
+                       Datum *values, bool *nulls, bool is_csv)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    ExprState **defexprs = cstate->defexprs;
+    char      **field_strings;
+    ListCell   *cur;
+    int            fldct;
+    int            fieldno;
+    char       *string;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    /* read raw fields in the next line */
+    if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
+        return false;
+
+    /* check for overflowing fields */
+    if (attr_count > 0 && fldct > attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("extra data after last expected column")));
+
+    fieldno = 0;
+
+    /* Loop to read the user attributes on the line. */
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        if (fieldno >= fldct)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("missing data for column \"%s\"",
+                            NameStr(att->attname))));
+        string = field_strings[fieldno++];
+
+        if (cstate->convert_select_flags &&
+            !cstate->convert_select_flags[m])
+        {
+            /* ignore input field, leaving column as NULL */
+            continue;
+        }
+
+        if (is_csv)
+        {
+            if (string == NULL &&
+                cstate->opts.force_notnull_flags[m])
+            {
+                /*
+                 * FORCE_NOT_NULL option is set and column is NULL - convert
+                 * it to the NULL string.
+                 */
+                string = cstate->opts.null_print;
+            }
+            else if (string != NULL && cstate->opts.force_null_flags[m]
+                     && strcmp(string, cstate->opts.null_print) == 0)
+            {
+                /*
+                 * FORCE_NULL option is set and column matches the NULL
+                 * string. It must have been quoted, or otherwise the string
+                 * would already have been set to NULL. Convert it to NULL as
+                 * specified.
+                 */
+                string = NULL;
+            }
+        }
+
+        cstate->cur_attname = NameStr(att->attname);
+        cstate->cur_attval = string;
+
+        if (string != NULL)
+            nulls[m] = false;
+
+        if (cstate->defaults[m])
+        {
+            /*
+             * The caller must supply econtext and have switched into the
+             * per-tuple memory context in it.
+             */
+            Assert(econtext != NULL);
+            Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+            values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+        }
+
+        /*
+         * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+         */
+        else if (!InputFunctionCallSafe(&in_functions[m],
+                                        string,
+                                        typioparams[m],
+                                        att->atttypmod,
+                                        (Node *) cstate->escontext,
+                                        &values[m]))
+        {
+            Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+
+            cstate->num_errors++;
+
+            if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+            {
+                /*
+                 * Since we emit line number and column info in the below
+                 * notice message, we suppress error context information other
+                 * than the relation name.
+                 */
+                Assert(!cstate->relname_only);
+                cstate->relname_only = true;
+
+                if (cstate->cur_attval)
+                {
+                    char       *attval;
+
+                    attval = CopyLimitPrintoutLength(cstate->cur_attval);
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
\"%s\"",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname,
+                                   attval));
+                    pfree(attval);
+                }
+                else
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null
input",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname));
+
+                /* reset relname_only */
+                cstate->relname_only = false;
+            }
+
+            return true;
+        }
+
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+    }
+
+    Assert(fieldno == attr_count);
+
+    return true;
+}
+
+/* Implementation of the per-row callback for text format */
+bool
+CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                   bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
+}
+
+/* Implementation of the per-row callback for CSV format */
+bool
+CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                  bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
+}
+
+/* Implementation of the per-row callback for binary format */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                     bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    int16        fld_count;
+    ListCell   *cur;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    cstate->cur_lineno++;
+
+    if (!CopyGetInt16(cstate, &fld_count))
+    {
+        /* EOF detected (end of file, or protocol-level EOF) */
+        return false;
+    }
+
+    if (fld_count == -1)
+    {
+        /*
+         * Received EOF marker.  Wait for the protocol-level EOF, and complain
+         * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+         * that we correctly handle CopyFail, if client chooses to send that
+         * now.  When copying from file, we could ignore the rest of the file
+         * like in text mode, but we choose to be consistent with the COPY
+         * FROM STDIN case.
+         */
+        char        dummy;
+
+        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("received copy data after EOF marker")));
+        return false;
+    }
+
+    if (fld_count != attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("row field count is %d, expected %d",
+                        (int) fld_count, attr_count)));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        cstate->cur_attname = NameStr(att->attname);
+        values[m] = CopyReadBinaryAttribute(cstate,
+                                            &in_functions[m],
+                                            typioparams[m],
+                                            att->atttypmod,
+                                            &nulls[m]);
+        cstate->cur_attname = NULL;
+    }
+
+    return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
@@ -847,216 +1095,22 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
-                attr_count,
                 num_defaults = cstate->num_defaults;
-    FmgrInfo   *in_functions = cstate->in_functions;
-    Oid           *typioparams = cstate->typioparams;
     int            i;
     int           *defmap = cstate->defmap;
     ExprState **defexprs = cstate->defexprs;
 
     tupDesc = RelationGetDescr(cstate->rel);
     num_phys_attrs = tupDesc->natts;
-    attr_count = list_length(cstate->attnumlist);
 
     /* Initialize all values for row to NULL */
     MemSet(values, 0, num_phys_attrs * sizeof(Datum));
     MemSet(nulls, true, num_phys_attrs * sizeof(bool));
     MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-    if (!cstate->opts.binary)
-    {
-        char      **field_strings;
-        ListCell   *cur;
-        int            fldct;
-        int            fieldno;
-        char       *string;
-
-        /* read raw fields in the next line */
-        if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-            return false;
-
-        /* check for overflowing fields */
-        if (attr_count > 0 && fldct > attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("extra data after last expected column")));
-
-        fieldno = 0;
-
-        /* Loop to read the user attributes on the line. */
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            if (fieldno >= fldct)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("missing data for column \"%s\"",
-                                NameStr(att->attname))));
-            string = field_strings[fieldno++];
-
-            if (cstate->convert_select_flags &&
-                !cstate->convert_select_flags[m])
-            {
-                /* ignore input field, leaving column as NULL */
-                continue;
-            }
-
-            if (cstate->opts.csv_mode)
-            {
-                if (string == NULL &&
-                    cstate->opts.force_notnull_flags[m])
-                {
-                    /*
-                     * FORCE_NOT_NULL option is set and column is NULL -
-                     * convert it to the NULL string.
-                     */
-                    string = cstate->opts.null_print;
-                }
-                else if (string != NULL && cstate->opts.force_null_flags[m]
-                         && strcmp(string, cstate->opts.null_print) == 0)
-                {
-                    /*
-                     * FORCE_NULL option is set and column matches the NULL
-                     * string. It must have been quoted, or otherwise the
-                     * string would already have been set to NULL. Convert it
-                     * to NULL as specified.
-                     */
-                    string = NULL;
-                }
-            }
-
-            cstate->cur_attname = NameStr(att->attname);
-            cstate->cur_attval = string;
-
-            if (string != NULL)
-                nulls[m] = false;
-
-            if (cstate->defaults[m])
-            {
-                /*
-                 * The caller must supply econtext and have switched into the
-                 * per-tuple memory context in it.
-                 */
-                Assert(econtext != NULL);
-                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-                values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-            }
-
-            /*
-             * If ON_ERROR is specified with IGNORE, skip rows with soft
-             * errors
-             */
-            else if (!InputFunctionCallSafe(&in_functions[m],
-                                            string,
-                                            typioparams[m],
-                                            att->atttypmod,
-                                            (Node *) cstate->escontext,
-                                            &values[m]))
-            {
-                Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
-
-                cstate->num_errors++;
-
-                if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-                {
-                    /*
-                     * Since we emit line number and column info in the below
-                     * notice message, we suppress error context information
-                     * other than the relation name.
-                     */
-                    Assert(!cstate->relname_only);
-                    cstate->relname_only = true;
-
-                    if (cstate->cur_attval)
-                    {
-                        char       *attval;
-
-                        attval = CopyLimitPrintoutLength(cstate->cur_attval);
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
\"%s\"",
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname,
-                                       attval));
-                        pfree(attval);
-                    }
-                    else
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
nullinput",
 
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname));
-
-                    /* reset relname_only */
-                    cstate->relname_only = false;
-                }
-
-                return true;
-            }
-
-            cstate->cur_attname = NULL;
-            cstate->cur_attval = NULL;
-        }
-
-        Assert(fieldno == attr_count);
-    }
-    else
-    {
-        /* binary */
-        int16        fld_count;
-        ListCell   *cur;
-
-        cstate->cur_lineno++;
-
-        if (!CopyGetInt16(cstate, &fld_count))
-        {
-            /* EOF detected (end of file, or protocol-level EOF) */
-            return false;
-        }
-
-        if (fld_count == -1)
-        {
-            /*
-             * Received EOF marker.  Wait for the protocol-level EOF, and
-             * complain if it doesn't come immediately.  In COPY FROM STDIN,
-             * this ensures that we correctly handle CopyFail, if client
-             * chooses to send that now.  When copying from file, we could
-             * ignore the rest of the file like in text mode, but we choose to
-             * be consistent with the COPY FROM STDIN case.
-             */
-            char        dummy;
-
-            if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("received copy data after EOF marker")));
-            return false;
-        }
-
-        if (fld_count != attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("row field count is %d, expected %d",
-                            (int) fld_count, attr_count)));
-
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            cstate->cur_attname = NameStr(att->attname);
-            values[m] = CopyReadBinaryAttribute(cstate,
-                                                &in_functions[m],
-                                                typioparams[m],
-                                                att->atttypmod,
-                                                &nulls[m]);
-            cstate->cur_attname = NULL;
-        }
-    }
+    /* Get one row from source */
+    if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+        return false;
 
     /*
      * Now compute and insert any defaults available for the columns not
@@ -1087,7 +1141,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
  * in the final value of line_buf.
  */
 static bool
-CopyReadLine(CopyFromState cstate)
+CopyReadLine(CopyFromState cstate, bool is_csv)
 {
     bool        result;
 
@@ -1095,7 +1149,7 @@ CopyReadLine(CopyFromState cstate)
     cstate->line_buf_valid = false;
 
     /* Parse data and transfer into line_buf */
-    result = CopyReadLineText(cstate);
+    result = CopyReadLineText(cstate, is_csv);
 
     if (result)
     {
@@ -1163,7 +1217,7 @@ CopyReadLine(CopyFromState cstate)
  * CopyReadLineText - inner loop of CopyReadLine for text mode
  */
 static bool
-CopyReadLineText(CopyFromState cstate)
+CopyReadLineText(CopyFromState cstate, bool is_csv)
 {
     char       *copy_input_buf;
     int            input_buf_ptr;
@@ -1178,7 +1232,11 @@ CopyReadLineText(CopyFromState cstate)
     char        quotec = '\0';
     char        escapec = '\0';
 
-    if (cstate->opts.csv_mode)
+    /*
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
     {
         quotec = cstate->opts.quote[0];
         escapec = cstate->opts.escape[0];
@@ -1255,7 +1313,11 @@ CopyReadLineText(CopyFromState cstate)
         prev_raw_ptr = input_buf_ptr;
         c = copy_input_buf[input_buf_ptr++];
 
-        if (cstate->opts.csv_mode)
+        /*
+         * is_csv will be optimized away by compiler, as argument is constant
+         * at caller.
+         */
+        if (is_csv)
         {
             /*
              * If character is '\r', we may need to look ahead below.  Force
@@ -1294,7 +1356,7 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \r */
-        if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\r' && (!is_csv || !in_quote))
         {
             /* Check for \r\n on first line, _and_ handle \r\n. */
             if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1322,10 +1384,10 @@ CopyReadLineText(CopyFromState cstate)
                     if (cstate->eol_type == EOL_CRNL)
                         ereport(ERROR,
                                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errmsg("literal carriage return found in data") :
                                  errmsg("unquoted carriage return found in data"),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errhint("Use \"\\r\" to represent carriage return.") :
                                  errhint("Use quoted CSV field to represent carriage return.")));
 
@@ -1339,10 +1401,10 @@ CopyReadLineText(CopyFromState cstate)
             else if (cstate->eol_type == EOL_NL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal carriage return found in data") :
                          errmsg("unquoted carriage return found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\r\" to represent carriage return.") :
                          errhint("Use quoted CSV field to represent carriage return.")));
             /* If reach here, we have found the line terminator */
@@ -1350,15 +1412,15 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \n */
-        if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\n' && (!is_csv || !in_quote))
         {
             if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal newline found in data") :
                          errmsg("unquoted newline found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\n\" to represent newline.") :
                          errhint("Use quoted CSV field to represent newline.")));
             cstate->eol_type = EOL_NL;    /* in case not set yet */
@@ -1370,7 +1432,7 @@ CopyReadLineText(CopyFromState cstate)
          * Process backslash, except in CSV mode where backslash is a normal
          * character.
          */
-        if (c == '\\' && !cstate->opts.csv_mode)
+        if (c == '\\' && !is_csv)
         {
             char        c2;
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 06dfdfef721..7bc044e2816 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
                          Datum *values, bool *nulls);
-extern bool NextCopyFromRawFields(CopyFromState cstate,
-                                  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 extern char *CopyLimitPrintoutLength(const char *str);
 
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index be29e3fbdef..51e131e5e8a 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * copyapi.h
- *      API for COPY TO handlers
+ *      API for COPY TO/FROM handlers
  *
  *
  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
@@ -54,4 +54,50 @@ typedef struct CopyToRoutine
     void        (*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+/*
+ * API structure for a COPY FROM format implementation.     Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Set input function information. This callback is called once at the
+     * beginning of COPY FROM.
+     *
+     * 'finfo' can be optionally filled to provide the catalog information of
+     * the input function.
+     *
+     * 'typioparam' can be optionally filled to define the OID of the type to
+     * pass to the input function.'atttypid' is the OID of data type used by
+     * the relation's attribute.
+     */
+    void        (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+                                   FmgrInfo *finfo, Oid *typioparam);
+
+    /*
+     * Start a COPY FROM. This callback is called once at the beginning of
+     * COPY FROM.
+     *
+     * 'tupDesc' is the tuple descriptor of the relation where the data needs
+     * to be copied.  This can be used for any initialization steps required
+     * by a format.
+     */
+    void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+    /*
+     * Read one row from the source and fill *values and *nulls.
+     *
+     * 'econtext' is used to evaluate default expression for each column that
+     * is either not read from the file or is using the DEFAULT option of COPY
+     * FROM.  It is NULL if no default values are used.
+     *
+     * Returns false if there are no more tuples to read.
+     */
+    bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+                                   Datum *values, bool *nulls);
+
+    /* End a COPY FROM. This callback is called once at the end of COPY FROM */
+    void        (*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
 #endif                            /* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 1d8ac8f62e6..e1affe3dfa7 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -14,7 +14,7 @@
 #ifndef COPYFROM_INTERNAL_H
 #define COPYFROM_INTERNAL_H
 
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
@@ -58,6 +58,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+    /* format routine */
+    const CopyFromRoutine *routine;
+
     /* low-level state data */
     CopySource    copy_src;        /* type of copy source */
     FILE       *copy_file;        /* used if copy_src == COPY_FILE */
@@ -183,4 +186,12 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+/* One-row callbacks for built-in formats defined in copyfromparse.c */
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
+                               Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
+                              Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                                 Datum *values, bool *nulls);
+
 #endif                            /* COPYFROM_INTERNAL_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 1cbb3628857..afdafefeb9b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -497,6 +497,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice
-- 
2.47.1

From cb4937aed8565e620715e03ae3b469341ab5ae65 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 25 Nov 2024 12:19:15 +0900
Subject: [PATCH v29 3/9] Add support for adding custom COPY TO format

This uses the handler approach like tablesample. The approach creates
an internal function that returns an internal struct. In this case,
a COPY TO handler returns a CopyToRoutine.

This also add a test module for custom COPY TO handler.
---
 src/backend/commands/copy.c                   | 97 ++++++++++++++++---
 src/backend/commands/copyto.c                 | 20 ++--
 src/backend/nodes/Makefile                    |  1 +
 src/backend/nodes/gen_node_support.pl         |  2 +
 src/backend/utils/adt/pseudotypes.c           |  1 +
 src/include/catalog/pg_proc.dat               |  6 ++
 src/include/catalog/pg_type.dat               |  6 ++
 src/include/commands/copy.h                   |  1 +
 src/include/commands/copyapi.h                |  4 +-
 src/include/commands/copyto_internal.h        | 21 ++++
 src/include/nodes/meson.build                 |  1 +
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_copy_format/.gitignore  |  4 +
 src/test/modules/test_copy_format/Makefile    | 23 +++++
 .../expected/test_copy_format.out             | 17 ++++
 src/test/modules/test_copy_format/meson.build | 33 +++++++
 .../test_copy_format/sql/test_copy_format.sql |  5 +
 .../test_copy_format--1.0.sql                 |  8 ++
 .../test_copy_format/test_copy_format.c       | 63 ++++++++++++
 .../test_copy_format/test_copy_format.control |  4 +
 21 files changed, 295 insertions(+), 24 deletions(-)
 mode change 100644 => 100755 src/backend/nodes/gen_node_support.pl
 create mode 100644 src/include/commands/copyto_internal.h
 create mode 100644 src/test/modules/test_copy_format/.gitignore
 create mode 100644 src/test/modules/test_copy_format/Makefile
 create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out
 create mode 100644 src/test/modules/test_copy_format/meson.build
 create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.c
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfca9d9dc29..9500156b163 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -22,7 +22,7 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/pg_authid.h"
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/defrem.h"
 #include "executor/executor.h"
 #include "mb/pg_wchar.h"
@@ -32,6 +32,7 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
+#include "parser/parse_func.h"
 #include "parser/parse_relation.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -476,6 +477,79 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate)
     return COPY_LOG_VERBOSITY_DEFAULT;    /* keep compiler quiet */
 }
 
+/*
+ * Process the "format" option.
+ *
+ * This function checks whether the option value is a built-in format such as
+ * "text" and "csv" or not. If the option value isn't a built-in format, this
+ * function finds a COPY format handler that returns a CopyToRoutine (for
+ * is_from == false). If no COPY format handler is found, this function
+ * reports an error.
+ */
+static void
+ProcessCopyOptionFormat(ParseState *pstate,
+                        CopyFormatOptions *opts_out,
+                        bool is_from,
+                        DefElem *defel)
+{
+    char       *format;
+    bool        isBuiltin;
+    Oid            funcargtypes[1];
+    Oid            handlerOid = InvalidOid;
+    Datum        datum;
+    Node       *routine;
+
+    format = defGetString(defel);
+
+    isBuiltin = true;
+    opts_out->csv_mode = false;
+    opts_out->binary = false;
+    /* built-in formats */
+    if (strcmp(format, "text") == 0)
+         /* "csv_mode == false && binary == false" means "text" */ ;
+    else if (strcmp(format, "csv") == 0)
+        opts_out->csv_mode = true;
+    else if (strcmp(format, "binary") == 0)
+        opts_out->binary = true;
+    else
+        isBuiltin = false;
+    if (isBuiltin)
+    {
+        if (!is_from)
+            opts_out->routine = (Node *) CopyToGetBuiltinRoutine(opts_out);
+        return;
+    }
+
+    /* custom format */
+    if (!is_from)
+    {
+        funcargtypes[0] = INTERNALOID;
+        handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                    funcargtypes, true);
+    }
+    if (!OidIsValid(handlerOid))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+
+    datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
+    routine = (Node *) DatumGetPointer(datum);
+    if (routine == NULL || !IsA(routine, CopyToRoutine))
+        ereport(
+                ERROR,
+                (errcode(
+                         ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY handler function "
+                        "%s(%u) did not return a "
+                        "CopyToRoutine struct",
+                        format, handlerOid),
+                 parser_errposition(
+                                    pstate, defel->location)));
+
+    opts_out->routine = routine;
+}
+
 /*
  * Process the statement option list for COPY.
  *
@@ -519,22 +593,10 @@ ProcessCopyOptions(ParseState *pstate,
 
         if (strcmp(defel->defname, "format") == 0)
         {
-            char       *fmt = defGetString(defel);
-
             if (format_specified)
                 errorConflictingDefElem(defel, pstate);
             format_specified = true;
-            if (strcmp(fmt, "text") == 0)
-                 /* default format */ ;
-            else if (strcmp(fmt, "csv") == 0)
-                opts_out->csv_mode = true;
-            else if (strcmp(fmt, "binary") == 0)
-                opts_out->binary = true;
-            else
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                         errmsg("COPY format \"%s\" not recognized", fmt),
-                         parser_errposition(pstate, defel->location)));
+            ProcessCopyOptionFormat(pstate, opts_out, is_from, defel);
         }
         else if (strcmp(defel->defname, "freeze") == 0)
         {
@@ -685,6 +747,13 @@ ProcessCopyOptions(ParseState *pstate,
                      parser_errposition(pstate, defel->location)));
     }
 
+    /* If format option isn't specified, we use a built-in routine. */
+    if (!format_specified)
+    {
+        if (!is_from)
+            opts_out->routine = (Node *) CopyToGetBuiltinRoutine(opts_out);
+    }
+
     /*
      * Check for incompatible options (must do these three before inserting
      * defaults)
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 26c67ddc351..f7f44b368b7 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -150,6 +150,7 @@ static void CopySendInt16(CopyToState cstate, int16 val);
 
 /* text format */
 static const CopyToRoutine CopyToRoutineText = {
+    .type = T_CopyToRoutine,
     .CopyToStart = CopyToTextLikeStart,
     .CopyToOutFunc = CopyToTextLikeOutFunc,
     .CopyToOneRow = CopyToTextOneRow,
@@ -158,6 +159,7 @@ static const CopyToRoutine CopyToRoutineText = {
 
 /* CSV format */
 static const CopyToRoutine CopyToRoutineCSV = {
+    .type = T_CopyToRoutine,
     .CopyToStart = CopyToTextLikeStart,
     .CopyToOutFunc = CopyToTextLikeOutFunc,
     .CopyToOneRow = CopyToCSVOneRow,
@@ -166,23 +168,23 @@ static const CopyToRoutine CopyToRoutineCSV = {
 
 /* binary format */
 static const CopyToRoutine CopyToRoutineBinary = {
+    .type = T_CopyToRoutine,
     .CopyToStart = CopyToBinaryStart,
     .CopyToOutFunc = CopyToBinaryOutFunc,
     .CopyToOneRow = CopyToBinaryOneRow,
     .CopyToEnd = CopyToBinaryEnd,
 };
 
-/* Return a COPY TO routine for the given options */
-static const CopyToRoutine *
-CopyToGetRoutine(CopyFormatOptions opts)
+/* Return a built-in COPY TO routine for the given options */
+const CopyToRoutine *
+CopyToGetBuiltinRoutine(CopyFormatOptions *opts)
 {
-    if (opts.csv_mode)
+    if (opts->csv_mode)
         return &CopyToRoutineCSV;
-    else if (opts.binary)
+    else if (opts->binary)
         return &CopyToRoutineBinary;
-
-    /* default is text */
-    return &CopyToRoutineText;
+    else
+        return &CopyToRoutineText;
 }
 
 /* Implementation of the start callback for text and CSV formats */
@@ -703,7 +705,7 @@ BeginCopyTo(ParseState *pstate,
     ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
 
     /* Set format routine */
-    cstate->routine = CopyToGetRoutine(cstate->opts);
+    cstate->routine = (const CopyToRoutine *) cstate->opts.routine;
 
     /* Process the source/target relation or query */
     if (rel)
diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile
index 66bbad8e6e0..173ee11811c 100644
--- a/src/backend/nodes/Makefile
+++ b/src/backend/nodes/Makefile
@@ -49,6 +49,7 @@ node_headers = \
     access/sdir.h \
     access/tableam.h \
     access/tsmapi.h \
+    commands/copyapi.h \
     commands/event_trigger.h \
     commands/trigger.h \
     executor/tuptable.h \
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
old mode 100644
new mode 100755
index 7c012c27f88..5d53d32c4a7
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -61,6 +61,7 @@ my @all_input_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
@@ -85,6 +86,7 @@ my @nodetag_only_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index 317a1f2b282..f2ebc21ca56 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler);
+PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(internal);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 5b8c2ad2a54..b231e7a041e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7803,6 +7803,12 @@
 { oid => '3312', descr => 'I/O',
   proname => 'tsm_handler_out', prorettype => 'cstring',
   proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' },
+{ oid => '8753', descr => 'I/O',
+  proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler',
+  proargtypes => 'cstring', prosrc => 'copy_handler_in' },
+{ oid => '8754', descr => 'I/O',
+  proname => 'copy_handler_out', prorettype => 'cstring',
+  proargtypes => 'copy_handler', prosrc => 'copy_handler_out' },
 { oid => '267', descr => 'I/O',
   proname => 'table_am_handler_in', proisstrict => 'f',
   prorettype => 'table_am_handler', proargtypes => 'cstring',
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index 6dca77e0a22..340e0cd0a8d 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -633,6 +633,12 @@
   typcategory => 'P', typinput => 'tsm_handler_in',
   typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
+{ oid => '8752',
+  descr => 'pseudo-type for the result of a copy to method function',
+  typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p',
+  typcategory => 'P', typinput => 'copy_handler_in',
+  typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
+  typalign => 'i' },
 { oid => '269',
   descr => 'pseudo-type for the result of a table AM handler function',
   typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p',
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 7bc044e2816..2a90b39b6f6 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -87,6 +87,7 @@ typedef struct CopyFormatOptions
     CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
     int64        reject_limit;    /* maximum tolerable number of errors */
     List       *convert_select; /* list of column names (can be NIL) */
+    Node       *routine;        /* CopyToRoutine */
 } CopyFormatOptions;
 
 /* These are private in commands/copy[from|to].c */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 51e131e5e8a..12e4b1d47a7 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -14,7 +14,7 @@
 #ifndef COPYAPI_H
 #define COPYAPI_H
 
-#include "commands/copy.h"
+#include "commands/copyto_internal.h"
 #include "executor/tuptable.h"
 #include "nodes/execnodes.h"
 
@@ -24,6 +24,8 @@
  */
 typedef struct CopyToRoutine
 {
+    NodeTag        type;
+
     /*
      * Set output function information. This callback is called once at the
      * beginning of COPY TO.
diff --git a/src/include/commands/copyto_internal.h b/src/include/commands/copyto_internal.h
new file mode 100644
index 00000000000..f95d8da8e3e
--- /dev/null
+++ b/src/include/commands/copyto_internal.h
@@ -0,0 +1,21 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyto_internal.h
+ *      Internal definitions for COPY TO command.
+ *
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyto_internal.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYTO_INTERNAL_H
+#define COPYTO_INTERNAL_H
+
+#include "commands/copy.h"
+
+const struct CopyToRoutine *CopyToGetBuiltinRoutine(CopyFormatOptions *opts);
+
+#endif                            /* COPYTO_INTERNAL_H */
diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build
index f3dd5461fef..09f7443195f 100644
--- a/src/include/nodes/meson.build
+++ b/src/include/nodes/meson.build
@@ -11,6 +11,7 @@ node_support_input_i = [
   'access/sdir.h',
   'access/tableam.h',
   'access/tsmapi.h',
+  'commands/copyapi.h',
   'commands/event_trigger.h',
   'commands/trigger.h',
   'executor/tuptable.h',
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index c0d3cf0e14b..33e3a49a4fb 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
           spgist_name_ops \
           test_bloomfilter \
           test_copy_callbacks \
+          test_copy_format \
           test_custom_rmgrs \
           test_ddl_deparse \
           test_dsa \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 4f544a042d4..bf25658793d 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -14,6 +14,7 @@ subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
 subdir('test_copy_callbacks')
+subdir('test_copy_format')
 subdir('test_custom_rmgrs')
 subdir('test_ddl_deparse')
 subdir('test_dsa')
diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_copy_format/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile
new file mode 100644
index 00000000000..8497f91624d
--- /dev/null
+++ b/src/test/modules/test_copy_format/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_format/Makefile
+
+MODULE_big = test_copy_format
+OBJS = \
+    $(WIN32RES) \
+    test_copy_format.o
+PGFILEDESC = "test_copy_format - test custom COPY FORMAT"
+
+EXTENSION = test_copy_format
+DATA = test_copy_format--1.0.sql
+
+REGRESS = test_copy_format
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_format
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
new file mode 100644
index 00000000000..adfe7d1572a
--- /dev/null
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -0,0 +1,17 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format');
+ERROR:  COPY format "test_copy_format" not recognized
+LINE 1: COPY public.test FROM stdin WITH (FORMAT 'test_copy_format')...
+                                          ^
+COPY public.test TO stdout WITH (FORMAT 'test_copy_format');
+NOTICE:  test_copy_format: is_from=false
+NOTICE:  CopyToOutFunc: atttypid=21
+NOTICE:  CopyToOutFunc: atttypid=23
+NOTICE:  CopyToOutFunc: atttypid=20
+NOTICE:  CopyToStart: natts=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToEnd
diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build
new file mode 100644
index 00000000000..4cefe7b709a
--- /dev/null
+++ b/src/test/modules/test_copy_format/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+test_copy_format_sources = files(
+  'test_copy_format.c',
+)
+
+if host_system == 'windows'
+  test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_copy_format',
+    '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',])
+endif
+
+test_copy_format = shared_module('test_copy_format',
+  test_copy_format_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_copy_format
+
+test_install_data += files(
+  'test_copy_format.control',
+  'test_copy_format--1.0.sql',
+)
+
+tests += {
+  'name': 'test_copy_format',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_copy_format',
+    ],
+  },
+}
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
new file mode 100644
index 00000000000..810b3d8cedc
--- /dev/null
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -0,0 +1,5 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format');
+COPY public.test TO stdout WITH (FORMAT 'test_copy_format');
diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql
b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
new file mode 100644
index 00000000000..d24ea03ce99
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit
+
+CREATE FUNCTION test_copy_format(internal)
+    RETURNS copy_handler
+    AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
new file mode 100644
index 00000000000..e064f40473b
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -0,0 +1,63 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_format.c
+ *        Code for testing custom COPY format.
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *        src/test/modules/test_copy_format/test_copy_format.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "commands/copyapi.h"
+#include "commands/defrem.h"
+
+PG_MODULE_MAGIC;
+
+static void
+CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyToStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts)));
+}
+
+static void
+CopyToOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid)));
+}
+
+static void
+CopyToEnd(CopyToState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyToEnd")));
+}
+
+static const CopyToRoutine CopyToRoutineTestCopyFormat = {
+    .type = T_CopyToRoutine,
+    .CopyToOutFunc = CopyToOutFunc,
+    .CopyToStart = CopyToStart,
+    .CopyToOneRow = CopyToOneRow,
+    .CopyToEnd = CopyToEnd,
+};
+
+PG_FUNCTION_INFO_V1(test_copy_format);
+Datum
+test_copy_format(PG_FUNCTION_ARGS)
+{
+    bool        is_from = PG_GETARG_BOOL(0);
+
+    ereport(NOTICE,
+            (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
+
+    PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+}
diff --git a/src/test/modules/test_copy_format/test_copy_format.control
b/src/test/modules/test_copy_format/test_copy_format.control
new file mode 100644
index 00000000000..f05a6362358
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.control
@@ -0,0 +1,4 @@
+comment = 'Test code for custom COPY format'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_format'
+relocatable = true
-- 
2.47.1

From d17d5dae6865de82997a8511fdc097b1c64ccd73 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 25 Nov 2024 13:58:33 +0900
Subject: [PATCH v29 4/9] Export CopyToStateData as private data

It's for custom COPY TO format handlers implemented as extension.

This just moves codes. This doesn't change codes except CopyDest enum
values. CopyDest/CopyFrom enum values such as COPY_FILE are conflicted
each other. So COPY_DEST_ prefix instead of COPY_ prefix is used for
CopyDest enum values. For example, COPY_FILE in CopyDest is renamed to
COPY_DEST_FILE.

Note that this isn't enough to implement custom COPY TO format
handlers as extension. We'll do the followings in a subsequent commit:

1. Add an opaque space for custom COPY TO format handler
2. Export CopySendEndOfRow() to flush buffer
---
 src/backend/commands/copyto.c          | 77 +++-----------------------
 src/include/commands/copy.h            |  2 +-
 src/include/commands/copyapi.h         |  2 -
 src/include/commands/copyto_internal.h | 64 +++++++++++++++++++++
 4 files changed, 73 insertions(+), 72 deletions(-)

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index f7f44b368b7..91fa46ddf6f 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -36,67 +36,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * Represents the different dest cases we need to worry about at
- * the bottom level
- */
-typedef enum CopyDest
-{
-    COPY_FILE,                    /* to file (or a piped program) */
-    COPY_FRONTEND,                /* to frontend */
-    COPY_CALLBACK,                /* to callback function */
-} CopyDest;
-
-/*
- * This struct contains all the state variables used throughout a COPY TO
- * operation.
- *
- * Multi-byte encodings: all supported client-side encodings encode multi-byte
- * characters by having the first byte's high bit set. Subsequent bytes of the
- * character can have the high bit not set. When scanning data in such an
- * encoding to look for a match to a single-byte (ie ASCII) character, we must
- * use the full pg_encoding_mblen() machinery to skip over multibyte
- * characters, else we might find a false match to a trailing byte. In
- * supported server encodings, there is no possibility of a false match, and
- * it's faster to make useless comparisons to trailing bytes than it is to
- * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
- * when we have to do it the hard way.
- */
-typedef struct CopyToStateData
-{
-    /* format-specific routines */
-    const CopyToRoutine *routine;
-
-    /* low-level state data */
-    CopyDest    copy_dest;        /* type of copy source/destination */
-    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
-
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy to */
-    QueryDesc  *queryDesc;        /* executable query to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDOUT */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_dest_cb data_dest_cb; /* function for writing data */
-
-    CopyFormatOptions opts;
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    FmgrInfo   *out_functions;    /* lookup info for output functions */
-    MemoryContext rowcontext;    /* per-row evaluation context */
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyToStateData;
-
 /* DestReceiver for COPY (query) TO */
 typedef struct
 {
@@ -406,7 +345,7 @@ SendCopyBegin(CopyToState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_dest = COPY_FRONTEND;
+    cstate->copy_dest = COPY_DEST_FRONTEND;
 }
 
 static void
@@ -453,7 +392,7 @@ CopySendEndOfRow(CopyToState cstate)
 
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -487,11 +426,11 @@ CopySendEndOfRow(CopyToState cstate)
                              errmsg("could not write to COPY file: %m")));
             }
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
-        case COPY_CALLBACK:
+        case COPY_DEST_CALLBACK:
             cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
             break;
     }
@@ -512,7 +451,7 @@ CopySendTextLikeEndOfRow(CopyToState cstate)
 {
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             /* Default line termination depends on platform */
 #ifndef WIN32
             CopySendChar(cstate, '\n');
@@ -520,7 +459,7 @@ CopySendTextLikeEndOfRow(CopyToState cstate)
             CopySendString(cstate, "\r\n");
 #endif
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* The FE/BE protocol uses \n as newline for all platforms */
             CopySendChar(cstate, '\n');
             break;
@@ -904,12 +843,12 @@ BeginCopyTo(ParseState *pstate,
     /* See Multibyte encoding comment above */
     cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
 
-    cstate->copy_dest = COPY_FILE;    /* default */
+    cstate->copy_dest = COPY_DEST_FILE; /* default */
 
     if (data_dest_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_dest = COPY_CALLBACK;
+        cstate->copy_dest = COPY_DEST_CALLBACK;
         cstate->data_dest_cb = data_dest_cb;
     }
     else if (pipe)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 2a90b39b6f6..ef3dc02c56a 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -90,7 +90,7 @@ typedef struct CopyFormatOptions
     Node       *routine;        /* CopyToRoutine */
 } CopyFormatOptions;
 
-/* These are private in commands/copy[from|to].c */
+/* These are private in commands/copy[from|to]_internal.h */
 typedef struct CopyFromStateData *CopyFromState;
 typedef struct CopyToStateData *CopyToState;
 
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 12e4b1d47a7..5d071b378d6 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -15,8 +15,6 @@
 #define COPYAPI_H
 
 #include "commands/copyto_internal.h"
-#include "executor/tuptable.h"
-#include "nodes/execnodes.h"
 
 /*
  * API structure for a COPY TO format implementation. Note this must be
diff --git a/src/include/commands/copyto_internal.h b/src/include/commands/copyto_internal.h
index f95d8da8e3e..2df53dda8a0 100644
--- a/src/include/commands/copyto_internal.h
+++ b/src/include/commands/copyto_internal.h
@@ -15,6 +15,70 @@
 #define COPYTO_INTERNAL_H
 
 #include "commands/copy.h"
+#include "executor/execdesc.h"
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+
+/*
+ * Represents the different dest cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopyDest
+{
+    COPY_DEST_FILE,                /* to file (or a piped program) */
+    COPY_DEST_FRONTEND,            /* to frontend */
+    COPY_DEST_CALLBACK,            /* to callback function */
+} CopyDest;
+
+/*
+ * This struct contains all the state variables used throughout a COPY TO
+ * operation.
+ *
+ * Multi-byte encodings: all supported client-side encodings encode multi-byte
+ * characters by having the first byte's high bit set. Subsequent bytes of the
+ * character can have the high bit not set. When scanning data in such an
+ * encoding to look for a match to a single-byte (ie ASCII) character, we must
+ * use the full pg_encoding_mblen() machinery to skip over multibyte
+ * characters, else we might find a false match to a trailing byte. In
+ * supported server encodings, there is no possibility of a false match, and
+ * it's faster to make useless comparisons to trailing bytes than it is to
+ * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
+ * when we have to do it the hard way.
+ */
+typedef struct CopyToStateData
+{
+    /* format-specific routines */
+    const struct CopyToRoutine *routine;
+
+    /* low-level state data */
+    CopyDest    copy_dest;        /* type of copy source/destination */
+    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
+
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy to */
+    QueryDesc  *queryDesc;        /* executable query to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDOUT */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_dest_cb data_dest_cb; /* function for writing data */
+
+    CopyFormatOptions opts;
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    FmgrInfo   *out_functions;    /* lookup info for output functions */
+    MemoryContext rowcontext;    /* per-row evaluation context */
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyToStateData;
 
 const struct CopyToRoutine *CopyToGetBuiltinRoutine(CopyFormatOptions *opts);
 
-- 
2.47.1

From 6cc082a9398998aa37dbc57568ffa784c9cd7625 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 25 Nov 2024 14:01:18 +0900
Subject: [PATCH v29 5/9] Add support for implementing custom COPY TO format as
 extension

* Add CopyToStateData::opaque that can be used to keep data for custom
  COPY TO format implementation
* Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf
  as CopyToStateFlush()
---
 src/backend/commands/copyto.c          | 12 ++++++++++++
 src/include/commands/copyapi.h         |  2 ++
 src/include/commands/copyto_internal.h |  3 +++
 3 files changed, 17 insertions(+)

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 91fa46ddf6f..da281f32950 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -442,6 +442,18 @@ CopySendEndOfRow(CopyToState cstate)
     resetStringInfo(fe_msgbuf);
 }
 
+/*
+ * Export CopySendEndOfRow() for extensions. We want to keep
+ * CopySendEndOfRow() as a static function for
+ * optimization. CopySendEndOfRow() calls in this file may be optimized by a
+ * compiler.
+ */
+void
+CopyToStateFlush(CopyToState cstate)
+{
+    CopySendEndOfRow(cstate);
+}
+
 /*
  * Wrapper function of CopySendEndOfRow for text and CSV formats. Sends the
  * the line termination and do common appropriate things for the end of row.
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 5d071b378d6..f8167af4c79 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -54,6 +54,8 @@ typedef struct CopyToRoutine
     void        (*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+extern void CopyToStateFlush(CopyToState cstate);
+
 /*
  * API structure for a COPY FROM format implementation.     Note this must be
  * allocated in a server-lifetime manner, typically as a static const struct.
diff --git a/src/include/commands/copyto_internal.h b/src/include/commands/copyto_internal.h
index 2df53dda8a0..4b82372691e 100644
--- a/src/include/commands/copyto_internal.h
+++ b/src/include/commands/copyto_internal.h
@@ -78,6 +78,9 @@ typedef struct CopyToStateData
     FmgrInfo   *out_functions;    /* lookup info for output functions */
     MemoryContext rowcontext;    /* per-row evaluation context */
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyToStateData;
 
 const struct CopyToRoutine *CopyToGetBuiltinRoutine(CopyFormatOptions *opts);
-- 
2.47.1

From 8bab77b24f3795ea7c1f5ee16860348998355ccb Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 25 Nov 2024 14:11:55 +0900
Subject: [PATCH v29 6/9] Add support for adding custom COPY FROM format

This uses the same handler for COPY TO and COPY FROM but uses
different routine. This uses CopyToRoutine for COPY TO and
CopyFromRoutine for COPY FROM. PostgreSQL calls a COPY TO/FROM handler
with "is_from" argument. It's true for COPY FROM and false for COPY
TO:

    copy_handler(true) returns CopyToRoutine
    copy_handler(false) returns CopyFromRoutine

This also add a test module for custom COPY FROM handler.
---
 src/backend/commands/copy.c                   | 60 ++++++++++++-------
 src/backend/commands/copyfrom.c               | 23 +++----
 src/backend/commands/copyfromparse.c          |  2 +-
 src/include/catalog/pg_type.dat               |  2 +-
 src/include/commands/copy.h                   |  2 +-
 src/include/commands/copyapi.h                |  3 +
 src/include/commands/copyfrom_internal.h      |  6 +-
 .../expected/test_copy_format.out             | 10 +++-
 .../test_copy_format/sql/test_copy_format.sql |  1 +
 .../test_copy_format/test_copy_format.c       | 39 +++++++++++-
 10 files changed, 107 insertions(+), 41 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9500156b163..10f80ef3654 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -483,8 +483,8 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate)
  * This function checks whether the option value is a built-in format such as
  * "text" and "csv" or not. If the option value isn't a built-in format, this
  * function finds a COPY format handler that returns a CopyToRoutine (for
- * is_from == false). If no COPY format handler is found, this function
- * reports an error.
+ * is_from == false) or CopyFromRountine (for is_from == true). If no COPY
+ * format handler is found, this function reports an error.
  */
 static void
 ProcessCopyOptionFormat(ParseState *pstate,
@@ -515,18 +515,17 @@ ProcessCopyOptionFormat(ParseState *pstate,
         isBuiltin = false;
     if (isBuiltin)
     {
-        if (!is_from)
+        if (is_from)
+            opts_out->routine = (Node *) CopyFromGetBuiltinRoutine(opts_out);
+        else
             opts_out->routine = (Node *) CopyToGetBuiltinRoutine(opts_out);
         return;
     }
 
     /* custom format */
-    if (!is_from)
-    {
-        funcargtypes[0] = INTERNALOID;
-        handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
-                                    funcargtypes, true);
-    }
+    funcargtypes[0] = INTERNALOID;
+    handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                funcargtypes, true);
     if (!OidIsValid(handlerOid))
         ereport(ERROR,
                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -535,17 +534,34 @@ ProcessCopyOptionFormat(ParseState *pstate,
 
     datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
     routine = (Node *) DatumGetPointer(datum);
-    if (routine == NULL || !IsA(routine, CopyToRoutine))
-        ereport(
-                ERROR,
-                (errcode(
-                         ERRCODE_INVALID_PARAMETER_VALUE),
-                 errmsg("COPY handler function "
-                        "%s(%u) did not return a "
-                        "CopyToRoutine struct",
-                        format, handlerOid),
-                 parser_errposition(
-                                    pstate, defel->location)));
+    if (is_from)
+    {
+        if (routine == NULL || !IsA(routine, CopyFromRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyFromRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
+    else
+    {
+        if (routine == NULL || !IsA(routine, CopyToRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyToRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
 
     opts_out->routine = routine;
 }
@@ -750,7 +766,9 @@ ProcessCopyOptions(ParseState *pstate,
     /* If format option isn't specified, we use a built-in routine. */
     if (!format_specified)
     {
-        if (!is_from)
+        if (is_from)
+            opts_out->routine = (Node *) CopyFromGetBuiltinRoutine(opts_out);
+        else
             opts_out->routine = (Node *) CopyToGetBuiltinRoutine(opts_out);
     }
 
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 917fa6605ef..23027a664ec 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -28,8 +28,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
-#include "commands/copy.h"
-#include "commands/copyfrom_internal.h"
+#include "commands/copyapi.h"
 #include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
@@ -129,6 +128,7 @@ static void CopyFromBinaryEnd(CopyFromState cstate);
 
 /* text format */
 static const CopyFromRoutine CopyFromRoutineText = {
+    .type = T_CopyFromRoutine,
     .CopyFromInFunc = CopyFromTextLikeInFunc,
     .CopyFromStart = CopyFromTextLikeStart,
     .CopyFromOneRow = CopyFromTextOneRow,
@@ -137,6 +137,7 @@ static const CopyFromRoutine CopyFromRoutineText = {
 
 /* CSV format */
 static const CopyFromRoutine CopyFromRoutineCSV = {
+    .type = T_CopyFromRoutine,
     .CopyFromInFunc = CopyFromTextLikeInFunc,
     .CopyFromStart = CopyFromTextLikeStart,
     .CopyFromOneRow = CopyFromCSVOneRow,
@@ -145,23 +146,23 @@ static const CopyFromRoutine CopyFromRoutineCSV = {
 
 /* binary format */
 static const CopyFromRoutine CopyFromRoutineBinary = {
+    .type = T_CopyFromRoutine,
     .CopyFromInFunc = CopyFromBinaryInFunc,
     .CopyFromStart = CopyFromBinaryStart,
     .CopyFromOneRow = CopyFromBinaryOneRow,
     .CopyFromEnd = CopyFromBinaryEnd,
 };
 
-/* Return a COPY FROM routine for the given options */
-static const CopyFromRoutine *
-CopyFromGetRoutine(CopyFormatOptions opts)
+/* Return a built-in COPY FROM routine for the given options */
+const CopyFromRoutine *
+CopyFromGetBuiltinRoutine(CopyFormatOptions *opts)
 {
-    if (opts.csv_mode)
+    if (opts->csv_mode)
         return &CopyFromRoutineCSV;
-    else if (opts.binary)
+    else if (opts->binary)
         return &CopyFromRoutineBinary;
-
-    /* default is text */
-    return &CopyFromRoutineText;
+    else
+        return &CopyFromRoutineText;
 }
 
 /* Implementation of the start callback for text and CSV formats */
@@ -1567,7 +1568,7 @@ BeginCopyFrom(ParseState *pstate,
     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
     /* Set the format routine */
-    cstate->routine = CopyFromGetRoutine(cstate->opts);
+    cstate->routine = (const CopyFromRoutine *) cstate->opts.routine;
 
     /* Process the target relation */
     cstate->rel = rel;
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 65f20d332ee..4e6683eb9da 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -62,7 +62,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
-#include "commands/copyfrom_internal.h"
+#include "commands/copyapi.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
 #include "libpq/libpq.h"
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index 340e0cd0a8d..63b7d65f982 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -634,7 +634,7 @@
   typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
 { oid => '8752',
-  descr => 'pseudo-type for the result of a copy to method function',
+  descr => 'pseudo-type for the result of a copy to/from method function',
   typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p',
   typcategory => 'P', typinput => 'copy_handler_in',
   typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index ef3dc02c56a..586d6c0fe2e 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -87,7 +87,7 @@ typedef struct CopyFormatOptions
     CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
     int64        reject_limit;    /* maximum tolerable number of errors */
     List       *convert_select; /* list of column names (can be NIL) */
-    Node       *routine;        /* CopyToRoutine */
+    Node       *routine;        /* CopyToRoutine or CopyFromRoutine */
 } CopyFormatOptions;
 
 /* These are private in commands/copy[from|to]_internal.h */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index f8167af4c79..bf933069fea 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -15,6 +15,7 @@
 #define COPYAPI_H
 
 #include "commands/copyto_internal.h"
+#include "commands/copyfrom_internal.h"
 
 /*
  * API structure for a COPY TO format implementation. Note this must be
@@ -62,6 +63,8 @@ extern void CopyToStateFlush(CopyToState cstate);
  */
 typedef struct CopyFromRoutine
 {
+    NodeTag        type;
+
     /*
      * Set input function information. This callback is called once at the
      * beginning of COPY FROM.
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e1affe3dfa7..9b3b8336b67 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -14,7 +14,7 @@
 #ifndef COPYFROM_INTERNAL_H
 #define COPYFROM_INTERNAL_H
 
-#include "commands/copyapi.h"
+#include "commands/copy.h"
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
@@ -59,7 +59,7 @@ typedef enum CopyInsertMethod
 typedef struct CopyFromStateData
 {
     /* format routine */
-    const CopyFromRoutine *routine;
+    const struct CopyFromRoutine *routine;
 
     /* low-level state data */
     CopySource    copy_src;        /* type of copy source */
@@ -194,4 +194,6 @@ extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
 extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
                                  Datum *values, bool *nulls);
 
+const struct CopyFromRoutine *CopyFromGetBuiltinRoutine(CopyFormatOptions *opts);
+
 #endif                            /* COPYFROM_INTERNAL_H */
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
index adfe7d1572a..016893e7026 100644
--- a/src/test/modules/test_copy_format/expected/test_copy_format.out
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -2,9 +2,13 @@ CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a smallint, b integer, c bigint);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
 COPY public.test FROM stdin WITH (FORMAT 'test_copy_format');
-ERROR:  COPY format "test_copy_format" not recognized
-LINE 1: COPY public.test FROM stdin WITH (FORMAT 'test_copy_format')...
-                                          ^
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromInFunc: atttypid=21
+NOTICE:  CopyFromInFunc: atttypid=23
+NOTICE:  CopyFromInFunc: atttypid=20
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromEnd
 COPY public.test TO stdout WITH (FORMAT 'test_copy_format');
 NOTICE:  test_copy_format: is_from=false
 NOTICE:  CopyToOutFunc: atttypid=21
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
index 810b3d8cedc..0dfdfa00080 100644
--- a/src/test/modules/test_copy_format/sql/test_copy_format.sql
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -2,4 +2,5 @@ CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a smallint, b integer, c bigint);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
 COPY public.test FROM stdin WITH (FORMAT 'test_copy_format');
+\.
 COPY public.test TO stdout WITH (FORMAT 'test_copy_format');
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
index e064f40473b..f6b105659ab 100644
--- a/src/test/modules/test_copy_format/test_copy_format.c
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -18,6 +18,40 @@
 
 PG_MODULE_MAGIC;
 
+static void
+CopyFromInFunc(CopyFromState cstate, Oid atttypid,
+               FmgrInfo *finfo, Oid *typioparam)
+{
+    ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyFromStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts)));
+}
+
+static bool
+CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    ereport(NOTICE, (errmsg("CopyFromOneRow")));
+    return false;
+}
+
+static void
+CopyFromEnd(CopyFromState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyFromEnd")));
+}
+
+static const CopyFromRoutine CopyFromRoutineTestCopyFormat = {
+    .type = T_CopyFromRoutine,
+    .CopyFromInFunc = CopyFromInFunc,
+    .CopyFromStart = CopyFromStart,
+    .CopyFromOneRow = CopyFromOneRow,
+    .CopyFromEnd = CopyFromEnd,
+};
+
 static void
 CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
 {
@@ -59,5 +93,8 @@ test_copy_format(PG_FUNCTION_ARGS)
     ereport(NOTICE,
             (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
 
-    PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+    if (is_from)
+        PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat);
+    else
+        PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
 }
-- 
2.47.1

From b96cc379092fd7e177fa8d65aa56796c1b7332be Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 25 Nov 2024 14:19:34 +0900
Subject: [PATCH v29 7/9] Use COPY_SOURCE_ prefix for CopySource enum values

This is for consistency with CopyDest.
---
 src/backend/commands/copyfrom.c          |  4 ++--
 src/backend/commands/copyfromparse.c     | 10 +++++-----
 src/include/commands/copyfrom_internal.h |  6 +++---
 3 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 23027a664ec..3f6b0031d94 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1703,7 +1703,7 @@ BeginCopyFrom(ParseState *pstate,
                             pg_encoding_to_char(GetDatabaseEncoding()))));
     }
 
-    cstate->copy_src = COPY_FILE;    /* default */
+    cstate->copy_src = COPY_SOURCE_FILE;    /* default */
 
     cstate->whereClause = whereClause;
 
@@ -1831,7 +1831,7 @@ BeginCopyFrom(ParseState *pstate,
     if (data_source_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_src = COPY_CALLBACK;
+        cstate->copy_src = COPY_SOURCE_CALLBACK;
         cstate->data_source_cb = data_source_cb;
     }
     else if (pipe)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 4e6683eb9da..f7982bf692f 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -170,7 +170,7 @@ ReceiveCopyBegin(CopyFromState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_src = COPY_FRONTEND;
+    cstate->copy_src = COPY_SOURCE_FRONTEND;
     cstate->fe_msgbuf = makeStringInfo();
     /* We *must* flush here to ensure FE knows it can send. */
     pq_flush();
@@ -238,7 +238,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
 
     switch (cstate->copy_src)
     {
-        case COPY_FILE:
+        case COPY_SOURCE_FILE:
             bytesread = fread(databuf, 1, maxread, cstate->copy_file);
             if (ferror(cstate->copy_file))
                 ereport(ERROR,
@@ -247,7 +247,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
             if (bytesread == 0)
                 cstate->raw_reached_eof = true;
             break;
-        case COPY_FRONTEND:
+        case COPY_SOURCE_FRONTEND:
             while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof)
             {
                 int            avail;
@@ -330,7 +330,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
                 bytesread += avail;
             }
             break;
-        case COPY_CALLBACK:
+        case COPY_SOURCE_CALLBACK:
             bytesread = cstate->data_source_cb(databuf, minread, maxread);
             break;
     }
@@ -1158,7 +1158,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv)
          * after \. up to the protocol end of copy data.  (XXX maybe better
          * not to treat \. as special?)
          */
-        if (cstate->copy_src == COPY_FRONTEND)
+        if (cstate->copy_src == COPY_SOURCE_FRONTEND)
         {
             int            inbytes;
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 9b3b8336b67..3743b11faa4 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -24,9 +24,9 @@
  */
 typedef enum CopySource
 {
-    COPY_FILE,                    /* from file (or a piped program) */
-    COPY_FRONTEND,                /* from frontend */
-    COPY_CALLBACK,                /* from callback function */
+    COPY_SOURCE_FILE,            /* from file (or a piped program) */
+    COPY_SOURCE_FRONTEND,        /* from frontend */
+    COPY_SOURCE_CALLBACK,        /* from callback function */
 } CopySource;
 
 /*
-- 
2.47.1

From b52208e7f5292bcff38c353fbf1bba48a1f429d8 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 25 Nov 2024 14:21:39 +0900
Subject: [PATCH v29 8/9] Add support for implementing custom COPY FROM format
 as extension

* Add CopyFromStateData::opaque that can be used to keep data for
  custom COPY From format implementation
* Export CopyGetData() to get the next data as
  CopyFromStateGetData()
---
 src/backend/commands/copyfromparse.c     | 11 +++++++++++
 src/include/commands/copyapi.h           |  2 ++
 src/include/commands/copyfrom_internal.h |  3 +++
 3 files changed, 16 insertions(+)

diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f7982bf692f..650b6b2382b 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -729,6 +729,17 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
     return copied_bytes;
 }
 
+/*
+ * Export CopyGetData() for extensions. We want to keep CopyGetData() as a
+ * static function for optimization. CopyGetData() calls in this file may be
+ * optimized by a compiler.
+ */
+int
+CopyFromStateGetData(CopyFromState cstate, void *dest, int minread, int maxread)
+{
+    return CopyGetData(cstate, dest, minread, maxread);
+}
+
 /*
  * Read raw fields in the next line for COPY FROM in text or csv mode.
  * Return false if no more lines.
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index bf933069fea..d1a1dbeb178 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -105,4 +105,6 @@ typedef struct CopyFromRoutine
     void        (*CopyFromEnd) (CopyFromState cstate);
 } CopyFromRoutine;
 
+extern int    CopyFromStateGetData(CopyFromState cstate, void *dest, int minread, int maxread);
+
 #endif                            /* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 3743b11faa4..a65bbbc962e 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -181,6 +181,9 @@ typedef struct CopyFromStateData
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyFromStateData;
 
 extern void ReceiveCopyBegin(CopyFromState cstate);
-- 
2.47.1

From 7496b8bcceb5434a7005fbdf2ecea485f82b9fde Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Wed, 27 Nov 2024 16:23:55 +0900
Subject: [PATCH v29 9/9] Add CopyFromSkipErrorRow() for custom COPY format
 extension

Extensions must call CopyFromSkipErrorRow() when CopyFromOneRow
callback reports an error by errsave(). CopyFromSkipErrorRow() handles
"ON_ERROR stop" and "LOG_VERBOSITY verbose" cases.
---
 src/backend/commands/copyfromparse.c          | 82 +++++++++++--------
 src/include/commands/copyapi.h                |  2 +
 .../expected/test_copy_format.out             | 47 +++++++++++
 .../test_copy_format/sql/test_copy_format.sql | 24 ++++++
 .../test_copy_format/test_copy_format.c       | 82 ++++++++++++++++++-
 5 files changed, 199 insertions(+), 38 deletions(-)

diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 650b6b2382b..b016f43a711 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -851,6 +851,51 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool i
     return true;
 }
 
+/*
+ * Call this when you report an error by errsave() in your CopyFromOneRow
+ * callback. This handles "ON_ERROR stop" and "LOG_VERBOSITY verbose" cases
+ * for you.
+ */
+void
+CopyFromSkipErrorRow(CopyFromState cstate)
+{
+    Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+
+    cstate->num_errors++;
+
+    if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+    {
+        /*
+         * Since we emit line number and column info in the below notice
+         * message, we suppress error context information other than the
+         * relation name.
+         */
+        Assert(!cstate->relname_only);
+        cstate->relname_only = true;
+
+        if (cstate->cur_attval)
+        {
+            char       *attval;
+
+            attval = CopyLimitPrintoutLength(cstate->cur_attval);
+            ereport(NOTICE,
+                    errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
+                           (unsigned long long) cstate->cur_lineno,
+                           cstate->cur_attname,
+                           attval));
+            pfree(attval);
+        }
+        else
+            ereport(NOTICE,
+                    errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null
input",
+                           (unsigned long long) cstate->cur_lineno,
+                           cstate->cur_attname));
+
+        /* reset relname_only */
+        cstate->relname_only = false;
+    }
+}
+
 /*
  * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
  *
@@ -959,42 +1004,7 @@ CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
                                         (Node *) cstate->escontext,
                                         &values[m]))
         {
-            Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
-
-            cstate->num_errors++;
-
-            if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-            {
-                /*
-                 * Since we emit line number and column info in the below
-                 * notice message, we suppress error context information other
-                 * than the relation name.
-                 */
-                Assert(!cstate->relname_only);
-                cstate->relname_only = true;
-
-                if (cstate->cur_attval)
-                {
-                    char       *attval;
-
-                    attval = CopyLimitPrintoutLength(cstate->cur_attval);
-                    ereport(NOTICE,
-                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
\"%s\"",
-                                   (unsigned long long) cstate->cur_lineno,
-                                   cstate->cur_attname,
-                                   attval));
-                    pfree(attval);
-                }
-                else
-                    ereport(NOTICE,
-                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null
input",
-                                   (unsigned long long) cstate->cur_lineno,
-                                   cstate->cur_attname));
-
-                /* reset relname_only */
-                cstate->relname_only = false;
-            }
-
+            CopyFromSkipErrorRow(cstate);
             return true;
         }
 
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index d1a1dbeb178..389f887b2c1 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -107,4 +107,6 @@ typedef struct CopyFromRoutine
 
 extern int    CopyFromStateGetData(CopyFromState cstate, void *dest, int minread, int maxread);
 
+extern void CopyFromSkipErrorRow(CopyFromState cstate);
+
 #endif                            /* COPYAPI_H */
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
index 016893e7026..b9a6baa85c0 100644
--- a/src/test/modules/test_copy_format/expected/test_copy_format.out
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -1,6 +1,8 @@
 CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a smallint, b integer, c bigint);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+-- 987 is accepted.
+-- 654 is a hard error because ON_ERROR is stop by default.
 COPY public.test FROM stdin WITH (FORMAT 'test_copy_format');
 NOTICE:  test_copy_format: is_from=true
 NOTICE:  CopyFromInFunc: atttypid=21
@@ -8,7 +10,50 @@ NOTICE:  CopyFromInFunc: atttypid=23
 NOTICE:  CopyFromInFunc: atttypid=20
 NOTICE:  CopyFromStart: natts=3
 NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromOneRow
+ERROR:  invalid value: "6"
+CONTEXT:  COPY test, line 2, column a: "6"
+-- 987 is accepted.
+-- 654 is a soft error because ON_ERROR is ignore.
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore);
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromInFunc: atttypid=21
+NOTICE:  CopyFromInFunc: atttypid=23
+NOTICE:  CopyFromInFunc: atttypid=20
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromOneRow
+NOTICE:  1 row was skipped due to data type incompatibility
 NOTICE:  CopyFromEnd
+-- 987 is accepted.
+-- 654 is a soft error because ON_ERROR is ignore.
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore, LOG_VERBOSITY verbose);
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromInFunc: atttypid=21
+NOTICE:  CopyFromInFunc: atttypid=23
+NOTICE:  CopyFromInFunc: atttypid=20
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromOneRow
+NOTICE:  skipping row due to data type incompatibility at line 2 for column "a": "6"
+NOTICE:  CopyFromOneRow
+NOTICE:  1 row was skipped due to data type incompatibility
+NOTICE:  CopyFromEnd
+-- 987 is accepted.
+-- 654 is a soft error because ON_ERROR is ignore.
+-- 321 is a hard error.
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore);
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromInFunc: atttypid=21
+NOTICE:  CopyFromInFunc: atttypid=23
+NOTICE:  CopyFromInFunc: atttypid=20
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromOneRow
+ERROR:  too much lines: 3
+CONTEXT:  COPY test, line 3
 COPY public.test TO stdout WITH (FORMAT 'test_copy_format');
 NOTICE:  test_copy_format: is_from=false
 NOTICE:  CopyToOutFunc: atttypid=21
@@ -18,4 +63,6 @@ NOTICE:  CopyToStart: natts=3
 NOTICE:  CopyToOneRow: tts_nvalid=3
 NOTICE:  CopyToOneRow: tts_nvalid=3
 NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
 NOTICE:  CopyToEnd
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
index 0dfdfa00080..86db71bce7f 100644
--- a/src/test/modules/test_copy_format/sql/test_copy_format.sql
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -1,6 +1,30 @@
 CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a smallint, b integer, c bigint);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+-- 987 is accepted.
+-- 654 is a hard error because ON_ERROR is stop by default.
 COPY public.test FROM stdin WITH (FORMAT 'test_copy_format');
+987
+654
+\.
+-- 987 is accepted.
+-- 654 is a soft error because ON_ERROR is ignore.
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore);
+987
+654
+\.
+-- 987 is accepted.
+-- 654 is a soft error because ON_ERROR is ignore.
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore, LOG_VERBOSITY verbose);
+987
+654
+\.
+-- 987 is accepted.
+-- 654 is a soft error because ON_ERROR is ignore.
+-- 321 is a hard error.
+COPY public.test FROM stdin WITH (FORMAT 'test_copy_format', ON_ERROR ignore);
+987
+654
+321
 \.
 COPY public.test TO stdout WITH (FORMAT 'test_copy_format');
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
index f6b105659ab..f0f53838aef 100644
--- a/src/test/modules/test_copy_format/test_copy_format.c
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -32,10 +32,88 @@ CopyFromStart(CopyFromState cstate, TupleDesc tupDesc)
 }
 
 static bool
-CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+CopyFromOneRow(CopyFromState cstate, ExprContext *econtext,
+               Datum *values, bool *nulls)
 {
+    int            n_attributes = list_length(cstate->attnumlist);
+    char       *line;
+    int            line_size = n_attributes + 1;    /* +1 is for new line */
+    int            read_bytes;
+
     ereport(NOTICE, (errmsg("CopyFromOneRow")));
-    return false;
+
+    cstate->cur_lineno++;
+    line = palloc(line_size);
+    read_bytes = CopyFromStateRead(cstate, line, line_size);
+    if (read_bytes == 0)
+        return false;
+    if (read_bytes != line_size)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("one line must be %d bytes: %d",
+                        line_size, read_bytes)));
+
+    if (cstate->cur_lineno == 1)
+    {
+        /* Success */
+        TupleDesc    tupDesc = RelationGetDescr(cstate->rel);
+        ListCell   *cur;
+        int            i = 0;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            int            m = attnum - 1;
+            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+            if (att->atttypid == INT2OID)
+            {
+                values[i] = Int16GetDatum(line[i] - '0');
+            }
+            else if (att->atttypid == INT4OID)
+            {
+                values[i] = Int32GetDatum(line[i] - '0');
+            }
+            else if (att->atttypid == INT8OID)
+            {
+                values[i] = Int64GetDatum(line[i] - '0');
+            }
+            nulls[i] = false;
+            i++;
+        }
+    }
+    else if (cstate->cur_lineno == 2)
+    {
+        /* Soft error */
+        TupleDesc    tupDesc = RelationGetDescr(cstate->rel);
+        int            attnum = lfirst_int(list_head(cstate->attnumlist));
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+        char        value[2];
+
+        cstate->cur_attname = NameStr(att->attname);
+        value[0] = line[0];
+        value[1] = '\0';
+        cstate->cur_attval = value;
+        errsave((Node *) cstate->escontext,
+                (
+                 errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
+                 errmsg("invalid value: \"%c\"", line[0])));
+        CopyFromSkipErrorRow(cstate);
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+        return true;
+    }
+    else
+    {
+        /* Hard error */
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("too much lines: %llu",
+                        (unsigned long long) cstate->cur_lineno)));
+    }
+
+    return true;
 }
 
 static void
-- 
2.47.1


pgsql-hackers by date:

Previous
From: Benoit Lobréau
Date:
Subject: Re: Proposal to Enable/Disable Index using ALTER INDEX (with patch)
Next
From: Masahiko Sawada
Date:
Subject: Re: UUID v7