Thread: copy with compression progress n

copy with compression progress n

From
Andreas Pflug
Date:
I've been playing around with COPYing large binary data, and implemented
a COMPRESSION transfer format. The server side compression saves
significant bandwidth, which may be the major limiting factor when large
amounts of data is involved (i.e. in many cases where COPY TO/FROM
STDIN/STDOUT is used)
In addition, a progress notification can be enabled using a PROGRESS
<each n lines> option.

I tested this with a table, containing 2000 rows with a highly
compressable bytea column (size 1.4GB, on-disk 138MB). Numbers are as
follows (8.2 HEAD psql):
pg_dump -a -F c -t        652s, 146MB
\copy TO /dev/null        322s
\copy TO /dev/null binary    24s
\copy TO /dev/null compression    108s
\copy TO /tmp/file binary    55s, 1.4GB
\copy TO /tmp/file compression    108s, 133MB
\copy TO STDOUT binary|gzip -1    69s, 117MB

So using the plain text copy has a large overhead for text data over
binary formats. OTOH, copying normal rows WITH BINARY may bloat the
result too. A typical test table gave these numbers:
COPY:         6014 Bytes
BINARY:        15071 Bytes
COMPRESSION:    2334 Bytes

The compression (pg_lzcompress) is less efficient than a binary copy
piped to gzip, as long as the data transfer of 1.4GB from server to
client isn't limited by network bandwidth. Apparently, pg_lzcompress
uses 53s to compress to 133MB, while gzip only needs 14s for 117MB.
Might be worth to have a look optimizing that since it's used in
tuptoaster. Still, when network traffic is involved, it may be better to
have some time spent on the server to reduce data (e.g. for Slony, which
uses COPY to start a replication, and is likely to be operated over
lines <1GBit/s).

The attached patch implements COPY ... WITH [BINARY] COMPRESSION
(compression implies BINARY). The copy data uses bit 17 of the flag
field to identify compressed data.
The PROGRESS <n> option to throw notices each n lines has a caveat: when
copying TO STDOUT, data transfer will cease after the first notice was
sent. This may either mean "dont ereport(NOTICE) when COPYing data to
the client" or a bug somewhere.

Regards,
Andreas
Index: src/backend/commands/copy.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.266
diff -c -r1.266 copy.c
*** src/backend/commands/copy.c    26 May 2006 22:50:02 -0000    1.266
--- src/backend/commands/copy.c    31 May 2006 08:52:42 -0000
***************
*** 47,53 ****
  #include "utils/memutils.h"
  #include "utils/relcache.h"
  #include "utils/syscache.h"
!

  #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
  #define OCTVALUE(c) ((c) - '0')
--- 47,53 ----
  #include "utils/memutils.h"
  #include "utils/relcache.h"
  #include "utils/syscache.h"
! #include "utils/pg_lzcompress.h"

  #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
  #define OCTVALUE(c) ((c) - '0')
***************
*** 103,114 ****
--- 103,121 ----
      int            client_encoding;    /* remote side's character encoding */
      bool        need_transcoding;        /* client encoding diff from server? */
      bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
+     bool        do_compress;    /* compress data before writing to output */
+     bool        do_flush;       /* flush fe_msgbuf to copy target file/pipe */
+     bool        use_raw_buf;    /* use raw buffered data for CopyGetData */
      uint64        processed;        /* # of tuples processed */
+     uint64        progress;        /* progress notice each # tuples processed */
+
+     MemoryContext oldcontext;

      /* parameters from the COPY command */
      Relation    rel;            /* relation to copy to or from */
      List       *attnumlist;        /* integer list of attnums to copy */
      bool        binary;            /* binary format? */
+     bool        compression;    /* binary compressed format? */
      bool        oids;            /* include OIDs? */
      bool        csv_mode;        /* Comma Separated Value format? */
      bool        header_line;    /* CSV header line? */
***************
*** 153,162 ****
       * converts it.  Note: we guarantee that there is a \0 at
       * raw_buf[raw_buf_len].
       */
! #define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
      char       *raw_buf;
      int            raw_buf_index;    /* next byte to process */
      int            raw_buf_len;    /* total # of bytes stored */
  } CopyStateData;

  typedef CopyStateData *CopyState;
--- 160,170 ----
       * converts it.  Note: we guarantee that there is a \0 at
       * raw_buf[raw_buf_len].
       */
! #define RAW_BUF_SIZE 65536        /* initially, we palloc RAW_BUF_SIZE+1 bytes */
      char       *raw_buf;
      int            raw_buf_index;    /* next byte to process */
      int            raw_buf_len;    /* total # of bytes stored */
+     int         raw_buf_size;   /* actual raw_buf_size */
  } CopyStateData;

  typedef CopyStateData *CopyState;
***************
*** 260,265 ****
--- 268,276 ----
  static void CopySendEndOfRow(CopyState cstate);
  static int CopyGetData(CopyState cstate, void *databuf,
              int minread, int maxread);
+ static bool CopyLoadRawBuf(CopyState cstate);
+ static int  CopyLoadBuf(CopyState cstate, void *databuf,
+             int minread, int maxread);
  static void CopySendInt32(CopyState cstate, int32 val);
  static bool CopyGetInt32(CopyState cstate, int32 *val);
  static void CopySendInt16(CopyState cstate, int16 val);
***************
*** 409,442 ****
  static void
  CopySendEndOfRow(CopyState cstate)
  {
      StringInfo    fe_msgbuf = cstate->fe_msgbuf;

!     switch (cstate->copy_dest)
      {
!         case COPY_FILE:
!             if (!cstate->binary)
!             {
!                 /* Default line termination depends on platform */
! #ifndef WIN32
!                 CopySendChar(cstate, '\n');
  #else
!                 CopySendString(cstate, "\r\n");
  #endif
!             }

!             (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
!                           1, cstate->copy_file);
              if (ferror(cstate->copy_file))
                  ereport(ERROR,
                          (errcode_for_file_access(),
                           errmsg("could not write to COPY file: %m")));
              break;
          case COPY_OLD_FE:
!             /* The FE/BE protocol uses \n as newline for all platforms */
!             if (!cstate->binary)
!                 CopySendChar(cstate, '\n');
!
!             if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
              {
                  /* no hope of recovering connection sync, so FATAL */
                  ereport(FATAL,
--- 420,497 ----
  static void
  CopySendEndOfRow(CopyState cstate)
  {
+     PGLZ_Header  *tmp=0;
      StringInfo    fe_msgbuf = cstate->fe_msgbuf;
+
+     void *data;
+     int len;
+     bool writeUncompressed = false;

!     if (!cstate->binary)
      {
!         /* Default line termination depends on platform */
! #ifdef WIN32
!         if (cstate->copy_dest == COPY_FILE)
!             CopySendString(cstate, "\r\n");
!         else
  #else
!             /* The FE/BE protocol uses \n as newline for all platforms */
!             CopySendChar(cstate, '\n');
  #endif
!     }
!
!
!     if (cstate->do_compress)
!     {
!         if (!cstate->do_flush && fe_msgbuf->len < RAW_BUF_SIZE)
!         {
!             /* Wait for some more data until we compress and write out */
!             return;
!         }
!
!         tmp = (PGLZ_Header *) palloc(PGLZ_MAX_OUTPUT(fe_msgbuf->len));
!
! #if 1
!         (void) pglz_compress(fe_msgbuf->data, fe_msgbuf->len, tmp, NULL);
! #else /* simulate non-compressible data, test compression performance */
!         tmp->varsize = fe_msgbuf->len + sizeof(PGLZ_Header);
!         tmp->rawsize = fe_msgbuf->len;
! #endif
!         data = tmp;
!
!         if (PGLZ_IS_COMPRESSED(tmp))
!             len = tmp->varsize;
!         else
!         {
!             // incompressible data
!             len = sizeof(PGLZ_Header);
!             writeUncompressed = true;
!         }
!     }
!     else
!     {
!         data = fe_msgbuf->data;
!         len = fe_msgbuf->len;
!     }

!     switch (cstate->copy_dest)
!     {
!         case COPY_FILE:
!             (void) fwrite(data, len, 1, cstate->copy_file);
!             if (ferror(cstate->copy_file))
!                 ereport(ERROR,
!                         (errcode_for_file_access(),
!                          errmsg("could not write to COPY file: %m")));
!             if (writeUncompressed)
!                 (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file);
              if (ferror(cstate->copy_file))
                  ereport(ERROR,
                          (errcode_for_file_access(),
                           errmsg("could not write to COPY file: %m")));
              break;
          case COPY_OLD_FE:
!             if (pq_putbytes(data, len) ||
!                 (writeUncompressed && pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)))
              {
                  /* no hope of recovering connection sync, so FATAL */
                  ereport(FATAL,
***************
*** 445,459 ****
              }
              break;
          case COPY_NEW_FE:
-             /* The FE/BE protocol uses \n as newline for all platforms */
-             if (!cstate->binary)
-                 CopySendChar(cstate, '\n');
-
              /* Dump the accumulated row as one CopyData message */
!             (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
              break;
      }

      /* Reset fe_msgbuf to empty */
      fe_msgbuf->len = 0;
      fe_msgbuf->data[0] = '\0';
--- 500,515 ----
              }
              break;
          case COPY_NEW_FE:
              /* Dump the accumulated row as one CopyData message */
!             (void) pq_putmessage('d', data, len);
!             if (writeUncompressed)
!                 (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
              break;
      }

+     if (tmp)
+         pfree(tmp);
+
      /* Reset fe_msgbuf to empty */
      fe_msgbuf->len = 0;
      fe_msgbuf->data[0] = '\0';
***************
*** 475,480 ****
--- 531,564 ----
  static int
  CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
  {
+     if (cstate->use_raw_buf)
+     {
+         int bytesread = 0;
+
+         while (bytesread < minread)
+         {
+             int nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
+             if (nbytes > maxread-bytesread)
+                 nbytes = maxread-bytesread;
+
+             memcpy((char*)databuf + bytesread, cstate->raw_buf + cstate->raw_buf_index, nbytes);
+             cstate->raw_buf_index += nbytes;
+             bytesread += nbytes;
+
+             if (bytesread >= minread || !CopyLoadRawBuf(cstate))
+                 break;
+         }
+
+         return bytesread;
+     }
+     else
+         return CopyLoadBuf(cstate, databuf, minread, maxread);
+ }
+
+
+ static int
+ CopyLoadBuf(CopyState cstate, void *databuf, int minread, int maxread)
+ {
      int            bytesread = 0;

      switch (cstate->copy_dest)
***************
*** 662,669 ****
      else
          nbytes = 0;                /* no data need be saved */

!     inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
!                           1, RAW_BUF_SIZE - nbytes);
      nbytes += inbytes;
      cstate->raw_buf[nbytes] = '\0';
      cstate->raw_buf_index = 0;
--- 746,816 ----
      else
          nbytes = 0;                /* no data need be saved */

!     if (cstate->do_compress)
!       {
!         PGLZ_Header pglzHdr;
!         inbytes = CopyLoadBuf(cstate, &pglzHdr, sizeof(PGLZ_Header), sizeof(PGLZ_Header));
!
!         if (inbytes != sizeof(PGLZ_Header))
!         {
!             ereport(ERROR,
!                     (errcode(ERRCODE_CONNECTION_FAILURE),
!                      errmsg("not enough data")));
!         }
!         /* make sure raw_buf is big enough */
!         if (cstate->raw_buf_size < pglzHdr.rawsize + nbytes)
!         {
!             char *newbuf;
!             MemoryContext rowContext;
!
!             cstate->raw_buf_size = pglzHdr.rawsize + nbytes;
!
!             /* raw_buf is allocated statement-wide */
!             rowContext = MemoryContextSwitchTo(cstate->oldcontext);
!             newbuf=palloc(cstate->raw_buf_size+1);
!             MemoryContextSwitchTo(rowContext);
!
!             if (nbytes > 0)
!                 memcpy(newbuf, cstate->raw_buf, nbytes);
!
!             pfree(cstate->raw_buf);
!             cstate->raw_buf = newbuf;
!         }
!
!         if (PGLZ_IS_COMPRESSED(&pglzHdr))
!         {
!             PGLZ_Header *tmp = (PGLZ_Header*)palloc(pglzHdr.varsize);
!             memcpy(tmp, &pglzHdr, sizeof(PGLZ_Header));
!
!             inbytes = CopyLoadBuf(cstate, (char*)tmp + sizeof(PGLZ_Header),
!                                   pglzHdr.varsize - sizeof(PGLZ_Header), pglzHdr.varsize - sizeof(PGLZ_Header));
!             if (inbytes != pglzHdr.varsize-sizeof(PGLZ_Header))
!             {
!                 ereport(ERROR,
!                         (errcode(ERRCODE_CONNECTION_FAILURE),
!                          errmsg("not enough data")));
!             }
!             pglz_decompress(tmp, cstate->raw_buf + nbytes);
!             inbytes = pglzHdr.rawsize;
!         }
!         else
!         {
!             /* not compressed */
!             inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes,
!                                   pglzHdr.rawsize, pglzHdr.rawsize);
!             if (inbytes != pglzHdr.rawsize)
!             {
!                 ereport(ERROR,
!                         (errcode(ERRCODE_CONNECTION_FAILURE),
!                          errmsg("not enough data")));
!             }
!         }
!     }
!     else
!     {
!         inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes,
!                               cstate->raw_buf_size - nbytes, cstate->raw_buf_size - nbytes);
!     }
      nbytes += inbytes;
      cstate->raw_buf[nbytes] = '\0';
      cstate->raw_buf_index = 0;
***************
*** 733,738 ****
--- 880,902 ----
                           errmsg("conflicting or redundant options")));
              cstate->binary = intVal(defel->arg);
          }
+         else if (strcmp(defel->defname, "compression") == 0)
+         {
+             if (cstate->compression)
+                 ereport(ERROR,
+                         (errcode(ERRCODE_SYNTAX_ERROR),
+                          errmsg("conflicting or redundant options")));
+             cstate->compression = intVal(defel->arg);
+             cstate->binary = intVal(defel->arg);
+         }
+         else if (strcmp(defel->defname, "progress") == 0)
+         {
+             if (cstate->progress)
+                 ereport(ERROR,
+                         (errcode(ERRCODE_SYNTAX_ERROR),
+                          errmsg("conflicting or redundant options")));
+             cstate->progress = intVal(defel->arg);
+         }
          else if (strcmp(defel->defname, "oids") == 0)
          {
              if (cstate->oids)
***************
*** 1009,1015 ****
      initStringInfo(&cstate->attribute_buf);
      initStringInfo(&cstate->line_buf);
      cstate->line_buf_converted = false;
!     cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
      cstate->raw_buf_index = cstate->raw_buf_len = 0;
      cstate->processed = 0;

--- 1173,1180 ----
      initStringInfo(&cstate->attribute_buf);
      initStringInfo(&cstate->line_buf);
      cstate->line_buf_converted = false;
!     cstate->raw_buf_size=RAW_BUF_SIZE;
!     cstate->raw_buf = (char *) palloc(cstate->raw_buf_size+1);
      cstate->raw_buf_index = cstate->raw_buf_len = 0;
      cstate->processed = 0;

***************
*** 1274,1287 ****
--- 1439,1462 ----

          /* Signature */
          CopySendData(cstate, (char *) BinarySignature, 11);
+
          /* Flags field */
          tmp = 0;
          if (cstate->oids)
              tmp |= (1 << 16);
+         if (cstate->compression)
+             tmp |= (2 << 16);
          CopySendInt32(cstate, tmp);
          /* No header extension */
          tmp = 0;
          CopySendInt32(cstate, tmp);
+
+         if (cstate->compression)
+         {
+             CopySendEndOfRow(cstate);
+             /* from now on, rows will be compressed */
+             cstate->do_compress = true;
+         }
      }
      else
      {
***************
*** 1404,1413 ****
          }

          CopySendEndOfRow(cstate);
-
          MemoryContextSwitchTo(oldcontext);
!
          cstate->processed++;
      }

      heap_endscan(scandesc);
--- 1579,1589 ----
          }

          CopySendEndOfRow(cstate);
          MemoryContextSwitchTo(oldcontext);
!
          cstate->processed++;
+         if (cstate->progress && (cstate->processed % cstate->progress) == 0)
+             ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed)));
      }

      heap_endscan(scandesc);
***************
*** 1417,1422 ****
--- 1593,1599 ----
          /* Generate trailer for a binary copy */
          CopySendInt16(cstate, -1);
          /* Need to flush out the trailer */
+         cstate->do_flush = true;
          CopySendEndOfRow(cstate);
      }

***************
*** 1563,1570 ****
      int           *defmap;
      ExprState **defexprs;        /* array of default att expressions */
      ExprContext *econtext;        /* used for ExecEvalExpr for default atts */
-     MemoryContext oldcontext = CurrentMemoryContext;
      ErrorContextCallback errcontext;

      tupDesc = RelationGetDescr(cstate->rel);
      attr = tupDesc->attrs;
--- 1740,1747 ----
      int           *defmap;
      ExprState **defexprs;        /* array of default att expressions */
      ExprContext *econtext;        /* used for ExecEvalExpr for default atts */
      ErrorContextCallback errcontext;
+     cstate->oldcontext = CurrentMemoryContext;

      tupDesc = RelationGetDescr(cstate->rel);
      attr = tupDesc->attrs;
***************
*** 1677,1683 ****
                      (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                       errmsg("invalid COPY file header (missing flags)")));
          file_has_oids = (tmp & (1 << 16)) != 0;
!         tmp &= ~(1 << 16);
          if ((tmp >> 16) != 0)
              ereport(ERROR,
                      (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
--- 1854,1861 ----
                      (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                       errmsg("invalid COPY file header (missing flags)")));
          file_has_oids = (tmp & (1 << 16)) != 0;
!         cstate->compression = (tmp & (2 << 16)) != 0;
!         tmp &= ~(3 << 16);
          if ((tmp >> 16) != 0)
              ereport(ERROR,
                      (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
***************
*** 1696,1701 ****
--- 1874,1883 ----
                          (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                           errmsg("invalid COPY file header (wrong length)")));
          }
+
+         if (cstate->compression)
+             cstate->do_compress = true;
+         cstate->use_raw_buf = true;
      }

      if (file_has_oids && cstate->binary)
***************
*** 1913,1919 ****
              HeapTupleSetOid(tuple, loaded_oid);

          /* Triggers and stuff need to be invoked in query context. */
!         MemoryContextSwitchTo(oldcontext);

          skip_tuple = false;

--- 2095,2101 ----
              HeapTupleSetOid(tuple, loaded_oid);

          /* Triggers and stuff need to be invoked in query context. */
!         MemoryContextSwitchTo(cstate->oldcontext);

          skip_tuple = false;

***************
*** 1958,1970 ****
               * tuples inserted by an INSERT command.
               */
              cstate->processed++;
          }
      }

      /* Done, clean up */
      error_context_stack = errcontext.previous;

!     MemoryContextSwitchTo(oldcontext);

      /* Execute AFTER STATEMENT insertion triggers */
      ExecASInsertTriggers(estate, resultRelInfo);
--- 2140,2159 ----
               * tuples inserted by an INSERT command.
               */
              cstate->processed++;
+
+             if (cstate->progress && (cstate->processed % cstate->progress) == 0)
+             {
+                 error_context_stack = errcontext.previous;
+                 ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed)));
+                 error_context_stack = &errcontext;
+             }
          }
      }

      /* Done, clean up */
      error_context_stack = errcontext.previous;

!     MemoryContextSwitchTo(cstate->oldcontext);

      /* Execute AFTER STATEMENT insertion triggers */
      ExecASInsertTriggers(estate, resultRelInfo);
Index: src/backend/parser/gram.y
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.545
diff -c -r2.545 gram.y
*** src/backend/parser/gram.y    27 May 2006 17:38:45 -0000    2.545
--- src/backend/parser/gram.y    31 May 2006 08:52:56 -0000
***************
*** 366,372 ****
      CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P
      CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
      CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
!     COMMITTED CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB
      CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME
      CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE

--- 366,372 ----
      CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P
      CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
      CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
!     COMMITTED COMPRESSION CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB
      CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME
      CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE

***************
*** 408,414 ****

      PARTIAL PASSWORD PLACING POSITION
      PRECISION PRESERVE PREPARE PREPARED PRIMARY
!     PRIOR PRIVILEGES PROCEDURAL PROCEDURE

      QUOTE

--- 408,414 ----

      PARTIAL PASSWORD PLACING POSITION
      PRECISION PRESERVE PREPARE PREPARED PRIMARY
!     PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRESS

      QUOTE

***************
*** 1649,1654 ****
--- 1649,1662 ----
                  {
                      $$ = makeDefElem("binary", (Node *)makeInteger(TRUE));
                  }
+             | COMPRESSION
+                 {
+                     $$ = makeDefElem("compression", (Node *)makeInteger(TRUE));
+                 }
+             | PROGRESS opt_as Iconst
+                 {
+                     $$ = makeDefElem("progress", (Node *)makeInteger($3));
+                 }
              | OIDS
                  {
                      $$ = makeDefElem("oids", (Node *)makeInteger(TRUE));
***************
*** 8369,8374 ****
--- 8377,8383 ----
              | COMMENT
              | COMMIT
              | COMMITTED
+             | COMPRESSION
              | CONNECTION
              | CONSTRAINTS
              | CONVERSION_P
***************
*** 8476,8481 ****
--- 8485,8491 ----
              | PRIVILEGES
              | PROCEDURAL
              | PROCEDURE
+             | PROGRESS
              | QUOTE
              | READ
              | REASSIGN
Index: src/backend/parser/keywords.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/keywords.c,v
retrieving revision 1.171
diff -c -r1.171 keywords.c
*** src/backend/parser/keywords.c    5 Mar 2006 15:58:32 -0000    1.171
--- src/backend/parser/keywords.c    31 May 2006 08:52:57 -0000
***************
*** 83,88 ****
--- 83,89 ----
      {"comment", COMMENT},
      {"commit", COMMIT},
      {"committed", COMMITTED},
+     {"compression", COMPRESSION},
      {"connection", CONNECTION},
      {"constraint", CONSTRAINT},
      {"constraints", CONSTRAINTS},
***************
*** 267,272 ****
--- 268,274 ----
      {"privileges", PRIVILEGES},
      {"procedural", PROCEDURAL},
      {"procedure", PROCEDURE},
+     {"progress", PROGRESS},
      {"quote", QUOTE},
      {"read", READ},
      {"real", REAL},
Index: src/bin/psql/copy.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/bin/psql/copy.c,v
retrieving revision 1.61
diff -c -r1.61 copy.c
*** src/bin/psql/copy.c    26 May 2006 19:51:29 -0000    1.61
--- src/bin/psql/copy.c    31 May 2006 08:53:00 -0000
***************
*** 62,70 ****
--- 62,72 ----
      bool        psql_inout;        /* true = use psql stdin/stdout */
      bool        from;
      bool        binary;
+     bool        compression;
      bool        oids;
      bool        csv_mode;
      bool        header;
+     long         progress;
      char       *delim;
      char       *null;
      char       *quote;
***************
*** 139,145 ****
      }

      result->table = pg_strdup(token);
-
      token = strtokx(NULL, whitespace, ".,()", "\"",
                      0, false, pset.encoding);
      if (!token)
--- 141,146 ----
***************
*** 283,288 ****
--- 284,297 ----
                  result->oids = true;
              else if (pg_strcasecmp(token, "binary") == 0)
                  result->binary = true;
+             else if (pg_strcasecmp(token, "compression") == 0)
+                 result->compression = true;
+             else if (pg_strcasecmp(token, "progress") == 0)
+             {
+                 token = strtokx(NULL, whitespace, NULL, NULL,
+                             0, false, pset.encoding);
+                 result->progress = atol(token);
+             }
              else if (pg_strcasecmp(token, "csv") == 0)
                  result->csv_mode = true;
              else if (pg_strcasecmp(token, "header") == 0)
***************
*** 478,483 ****
--- 487,497 ----
              appendPQExpBuffer(&query, " WITH NULL AS '%s'", options->null);
      }

+     if (options->compression)
+         appendPQExpBuffer(&query, " COMPRESSION");
+     if (options->progress)
+         appendPQExpBuffer(&query, " PROGRESS %ld", options->progress);
+
      if (options->csv_mode)
          appendPQExpBuffer(&query, " CSV");

Index: src/include/utils/pg_lzcompress.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/utils/pg_lzcompress.h,v
retrieving revision 1.11
diff -c -r1.11 pg_lzcompress.h
*** src/include/utils/pg_lzcompress.h    25 May 2005 21:40:42 -0000    1.11
--- src/include/utils/pg_lzcompress.h    31 May 2006 08:53:02 -0000
***************
*** 56,62 ****
   * ----------
   */
  #define PGLZ_IS_COMPRESSED(_lzdata)        ((_lzdata)->varsize !=                \
! e                                         (_lzdata)->rawsize +            e    \
                                                          sizeof(PGLZ_Header))

  /* ----------
--- 56,62 ----
   * ----------
   */
  #define PGLZ_IS_COMPRESSED(_lzdata)        ((_lzdata)->varsize !=                \
!                                          (_lzdata)->rawsize +                \
                                                          sizeof(PGLZ_Header))

  /* ----------

Re: copy with compression progress n

From
Tom Lane
Date:
Andreas Pflug <pgadmin@pse-consulting.de> writes:
> The attached patch implements COPY ... WITH [BINARY] COMPRESSION 
> (compression implies BINARY). The copy data uses bit 17 of the flag 
> field to identify compressed data.

I think this is a pretty horrid idea, because it changes pg_lzcompress
from an unimportant implementation detail into a backup file format
that we have to support till the end of time.  What happens if, say,
we need to abandon pg_lzcompress because we find out it has patent
problems?

It *might* be tolerable if we used gzip instead, but I really don't see
the argument for doing this inside the server at all: piping to gzip
seems like a perfectly acceptable solution, quite possibly with higher
performance than doing it all in a single process (which isn't going
to be able to use more than one CPU).

I don't see the argument for restricting it to binary only, either.
        regards, tom lane


Re: copy with compression progress n

From
Andreas Pflug
Date:
Tom Lane wrote:
> Andreas Pflug <pgadmin@pse-consulting.de> writes:
> 
>>The attached patch implements COPY ... WITH [BINARY] COMPRESSION 
>>(compression implies BINARY). The copy data uses bit 17 of the flag 
>>field to identify compressed data.
> 
> 
> I think this is a pretty horrid idea, because it changes pg_lzcompress
> from an unimportant implementation detail into a backup file format
> that we have to support till the end of time.  What happens if, say,
> we need to abandon pg_lzcompress because we find out it has patent
> problems?
> 
> It *might* be tolerable if we used gzip instead,

I used pg_lzcompress because it's present in the backend. I'm fine with 
every other good compression algorithm.

>  but I really don't see
> the argument for doing this inside the server at all: piping to gzip
> seems like a perfectly acceptable solution,

As I said, this hits only if it is possible to pipe the result into gzip 
in a performant way. The issue already arises if psql or any other COPY 
client (slony, pg_dump) is not on the same machine: Network bandwidth 
will limit throughput.

> quite possibly with higher
> performance than doing it all in a single process (which isn't going
> to be able to use more than one CPU).

Which is pretty normal for pgsql.

> I don't see the argument for restricting it to binary only, either.

That's not a restriction, but a result: compressed data is binary. 
Marking it as binary will make it working with older frontends as well, 
as long as they don't try to interpret the data. Actually, all 8.x psql 
versions should work (with COPY STDxx, not \copy).

Do you have a comment about the progress notification and its impact on 
copy to stdout?

Regards,
Andreas


Re: copy with compression progress n

From
Tom Lane
Date:
Andreas Pflug <pgadmin@pse-consulting.de> writes:
> Do you have a comment about the progress notification and its impact on 
> copy to stdout?

I didn't bother to comment on it because I think it's useless, as well
as broken for the stdout case.  Anyone who actually sees a use for it
will have to comment on why they want it.
        regards, tom lane


Re: copy progress notification

From
Andreas Pflug
Date:
Tom Lane wrote:
> Andreas Pflug <pgadmin@pse-consulting.de> writes:
> 
>>Do you have a comment about the progress notification and its impact on 
>>copy to stdout?
> 
> 
> I didn't bother to comment on it because I think it's useless,

It's useful to see anything at all, and to be able to estimate how long 
the whole process will take. People might find it interesting whether 
they should go for a cup of coffee or come better back the next day...

> as well as broken for the stdout case. 

I know it's broken, but why? Is using ereport when sending copy data 
illegal by design? If not, it's not the feature that's broken but 
something in cvs HEAD.

Regards,
Andreas


Re: copy with compression progress n

From
Hannu Krosing
Date:
Ühel kenal päeval, K, 2006-05-31 kell 17:31, kirjutas Andreas Pflug:
> Tom Lane wrote:
> > Andreas Pflug <pgadmin@pse-consulting.de> writes:
> > 
> >>The attached patch implements COPY ... WITH [BINARY] COMPRESSION 
> >>(compression implies BINARY). The copy data uses bit 17 of the flag 
> >>field to identify compressed data.
> > 
> > 
> > I think this is a pretty horrid idea, because it changes pg_lzcompress
> > from an unimportant implementation detail into a backup file format
> > that we have to support till the end of time.  What happens if, say,
> > we need to abandon pg_lzcompress because we find out it has patent
> > problems?
> > 
> > It *might* be tolerable if we used gzip instead,
> 
> I used pg_lzcompress because it's present in the backend. I'm fine with 
> every other good compression algorithm.
> 
> >  but I really don't see
> > the argument for doing this inside the server at all: piping to gzip
> > seems like a perfectly acceptable solution,
> 
> As I said, this hits only if it is possible to pipe the result into gzip 
> in a performant way. The issue already arises if psql or any other COPY 
> client (slony, pg_dump) is not on the same machine: Network bandwidth 
> will limit throughput.

Maybe make up a way to pipe COPY result through some external process
(like gzip) on the server side without having shell access there.

To make it secure, the external process should probably be run from a
hardwired directory via chroot.

-- 
----------------
Hannu Krosing
Database Architect
Skype Technologies OÜ
Akadeemia tee 21 F, Tallinn, 12618, Estonia

Skype me:  callto:hkrosing
Get Skype for free:  http://www.skype.com