Re: Make COPY format extendable: Extract COPY TO format implementations - Mailing list pgsql-hackers

From Sutou Kouhei
Subject Re: Make COPY format extendable: Extract COPY TO format implementations
Date
Msg-id 20240305.171808.667980402249336456.kou@clear-code.com
Whole thread Raw
In response to Re: Make COPY format extendable: Extract COPY TO format implementations  (Michael Paquier <michael@paquier.xyz>)
Responses Re: Make COPY format extendable: Extract COPY TO format implementations
List pgsql-hackers
Hi,

In <Zea4wXxpYaX64e_p@paquier.xyz>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Tue, 5 Mar 2024 15:16:33 +0900,
  Michael Paquier <michael@paquier.xyz> wrote:

> CopyOneRowTo() could do something like that to avoid the extra
> indentation:
> if (cstate->routine)
> {
>     cstate->routine->CopyToOneRow(cstate, slot);
>     MemoryContextSwitchTo(oldcontext);
>     return;
> }

OK. The v17 patch uses this style. Others are same as the
v16.

> I didn't know this trick.  That's indeed nice..  I may use that for
> other stuff to make patches more presentable to the eyes.  And that's
> available as well with `git diff`.

:-)

> If we basically agree about this part, how would the rest work out
> with this set of APIs and the possibility to plug in a custom value
> for FORMAT to do a pg_proc lookup, including an example of how these
> APIs can be used?

I'll send the following patches after this patch is
merged. They are based on the v6 patch[1]:

1. Add copy_handler
   * This also adds a pg_proc lookup for custom FORMAT
   * This also adds a test for copy_handler
2. Export CopyToStateData
   * We need it to implement custom copy TO handler
3. Add needed APIs to implement custom copy TO handler
   * Add CopyToStateData::opaque
   * Export CopySendEndOfRow()
4. Export CopyFromStateData
   * We need it to implement custom copy FROM handler
5. Add needed APIs to implement custom copy FROM handler
   * Add CopyFromStateData::opaque
   * Export CopyReadBinaryData()

[1]
https://www.postgresql.org/message-id/flat/20240124.144936.67229716500876806.kou%40clear-code.com#f1ad092fc5e81fe38d3c376559efd52c


Thanks,
-- 
kou
From a78b8ee88575e2c2873afc3acf3c8c4e535becf0 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Mar 2024 13:52:34 +0900
Subject: [PATCH v17] Add CopyFromRoutine/CopyToRountine

They are for implementing custom COPY FROM/TO format. But this is not
enough to implement custom COPY FROM/TO format yet. We'll export some
APIs to receive/send data and add "format" option to COPY FROM/TO
later.

Existing text/csv/binary format implementations don't use
CopyFromRoutine/CopyToRoutine for now. We have a patch for it but we
defer it. Because there are some mysterious profile results in spite
of we get faster runtimes. See [1] for details.

[1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz

Note that this doesn't change existing text/csv/binary format
implementations.
---
 src/backend/commands/copyfrom.c          |  24 +++++-
 src/backend/commands/copyfromparse.c     |   5 ++
 src/backend/commands/copyto.c            |  25 +++++-
 src/include/commands/copyapi.h           | 100 +++++++++++++++++++++++
 src/include/commands/copyfrom_internal.h |   4 +
 src/tools/pgindent/typedefs.list         |   2 +
 6 files changed, 155 insertions(+), 5 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index c3bc897028..9bf2f6497e 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1623,12 +1623,22 @@ BeginCopyFrom(ParseState *pstate,
 
         /* Fetch the input function and typioparam info */
         if (cstate->opts.binary)
+        {
             getTypeBinaryInputInfo(att->atttypid,
                                    &in_func_oid, &typioparams[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                            &in_functions[attnum - 1],
+                                            &typioparams[attnum - 1]);
+
         else
+        {
             getTypeInputInfo(att->atttypid,
                              &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1768,10 +1778,13 @@ BeginCopyFrom(ParseState *pstate,
         /* Read and verify binary header */
         ReceiveCopyBinaryHeader(cstate);
     }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
+    else if (cstate->routine)
     {
+        cstate->routine->CopyFromStart(cstate, tupDesc);
+    }
+    else
+    {
+        /* create workspace for CopyReadAttributes results */
         AttrNumber    attr_count = list_length(cstate->attnumlist);
 
         cstate->max_fields = attr_count;
@@ -1789,6 +1802,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    if (cstate->routine)
+        cstate->routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 7cacd0b752..8b15080585 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -978,6 +978,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 
         Assert(fieldno == attr_count);
     }
+    else if (cstate->routine)
+    {
+        if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+            return false;
+    }
     else
     {
         /* binary */
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 20ffc90363..b4a7c9c8b9 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -24,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
@@ -71,6 +72,9 @@ typedef enum CopyDest
  */
 typedef struct CopyToStateData
 {
+    /* format routine */
+    const CopyToRoutine *routine;
+
     /* low-level state data */
     CopyDest    copy_dest;        /* type of copy source/destination */
     FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
@@ -777,14 +781,22 @@ DoCopyTo(CopyToState cstate)
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
         if (cstate->opts.binary)
+        {
             getTypeBinaryOutputInfo(attr->atttypid,
                                     &out_func_oid,
                                     &isvarlena);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                           &cstate->out_functions[attnum - 1]);
         else
+        {
             getTypeOutputInfo(attr->atttypid,
                               &out_func_oid,
                               &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
     }
 
     /*
@@ -811,6 +823,8 @@ DoCopyTo(CopyToState cstate)
         tmp = 0;
         CopySendInt32(cstate, tmp);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToStart(cstate, tupDesc);
     else
     {
         /*
@@ -892,6 +906,8 @@ DoCopyTo(CopyToState cstate)
         /* Need to flush out the trailer */
         CopySendEndOfRow(cstate);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -916,6 +932,13 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
+    if (cstate->routine)
+    {
+        cstate->routine->CopyToOneRow(cstate, slot);
+        MemoryContextSwitchTo(oldcontext);
+        return;
+    }
+
     if (cstate->opts.binary)
     {
         /* Binary per-tuple header */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 0000000000..635c4cbff2
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,100 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO/FROM handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/*
+ * API structure for a COPY FROM format implementation.  Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Called when COPY FROM is started to set up the input functions
+     * associated to the relation's attributes writing to.  `finfo` can be
+     * optionally filled to provide the catalog information of the input
+     * function.  `typioparam` can be optionally filled to define the OID of
+     * the type to pass to the input function.  `atttypid` is the OID of data
+     * type used by the relation's attribute.
+     */
+    void        (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+                                   FmgrInfo *finfo, Oid *typioparam);
+
+    /*
+     * Called when COPY FROM is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation where the data needs
+     * to be copied.  This can be used for any initialization steps required
+     * by a format.
+     */
+    void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row to a set of `values` and `nulls` of size tupDesc->natts.
+     *
+     * 'econtext' is used to evaluate default expression for each column that
+     * is either not read from the file or is using the DEFAULT option of COPY
+     * FROM.  It is NULL if no default values are used.
+     *
+     * Returns false if there are no more tuples to copy.
+     */
+    bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+                                   Datum *values, bool *nulls);
+
+    /* Called when COPY FROM has ended. */
+    void        (*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
+/*
+ * API structure for a COPY TO format implementation.   Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyToRoutine
+{
+    /*
+     * Called when COPY TO is started to set up the output functions
+     * associated to the relation's attributes reading from.  `finfo` can be
+     * optionally filled.  `atttypid` is the OID of data type used by the
+     * relation's attribute.
+     */
+    void        (*CopyToOutFunc) (CopyToState cstate, Oid atttypid,
+                                  FmgrInfo *finfo);
+
+    /*
+     * Called when COPY TO is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation from where the data
+     * is read.
+     */
+    void        (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row for COPY TO.
+     *
+     * `slot` is the tuple slot where the data is emitted.
+     */
+    void        (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* Called when COPY TO has ended */
+    void        (*CopyToEnd) (CopyToState cstate);
+} CopyToRoutine;
+
+#endif                            /* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc78..509b9e92a1 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -15,6 +15,7 @@
 #define COPYFROM_INTERNAL_H
 
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
@@ -58,6 +59,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+    /* format routine */
+    const CopyFromRoutine *routine;
+
     /* low-level state data */
     CopySource    copy_src;        /* type of copy source */
     FILE       *copy_file;        /* used if copy_src == COPY_FILE */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index ee40a341d3..a5ae161ca5 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -475,6 +475,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice
@@ -484,6 +485,7 @@ CopyMultiInsertInfo
 CopyOnErrorChoice
 CopySource
 CopyStmt
+CopyToRoutine
 CopyToState
 CopyToStateData
 Cost
-- 
2.43.0


pgsql-hackers by date:

Previous
From: Andrei Lepikhov
Date:
Subject: Re: POC, WIP: OR-clause support for indexes
Next
From: Alexander Kukushkin
Date:
Subject: Re: Infinite loop in XLogPageRead() on standby