I've been playing around with COPYing large binary data, and implemented
a COMPRESSION transfer format. The server side compression saves
significant bandwidth, which may be the major limiting factor when large
amounts of data is involved (i.e. in many cases where COPY TO/FROM
STDIN/STDOUT is used)
In addition, a progress notification can be enabled using a PROGRESS
<each n lines> option.
I tested this with a table, containing 2000 rows with a highly
compressable bytea column (size 1.4GB, on-disk 138MB). Numbers are as
follows (8.2 HEAD psql):
pg_dump -a -F c -t 652s, 146MB
\copy TO /dev/null 322s
\copy TO /dev/null binary 24s
\copy TO /dev/null compression 108s
\copy TO /tmp/file binary 55s, 1.4GB
\copy TO /tmp/file compression 108s, 133MB
\copy TO STDOUT binary|gzip -1 69s, 117MB
So using the plain text copy has a large overhead for text data over
binary formats. OTOH, copying normal rows WITH BINARY may bloat the
result too. A typical test table gave these numbers:
COPY: 6014 Bytes
BINARY: 15071 Bytes
COMPRESSION: 2334 Bytes
The compression (pg_lzcompress) is less efficient than a binary copy
piped to gzip, as long as the data transfer of 1.4GB from server to
client isn't limited by network bandwidth. Apparently, pg_lzcompress
uses 53s to compress to 133MB, while gzip only needs 14s for 117MB.
Might be worth to have a look optimizing that since it's used in
tuptoaster. Still, when network traffic is involved, it may be better to
have some time spent on the server to reduce data (e.g. for Slony, which
uses COPY to start a replication, and is likely to be operated over
lines <1GBit/s).
The attached patch implements COPY ... WITH [BINARY] COMPRESSION
(compression implies BINARY). The copy data uses bit 17 of the flag
field to identify compressed data.
The PROGRESS <n> option to throw notices each n lines has a caveat: when
copying TO STDOUT, data transfer will cease after the first notice was
sent. This may either mean "dont ereport(NOTICE) when COPYing data to
the client" or a bug somewhere.
Regards,
Andreas
Index: src/backend/commands/copy.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.266
diff -c -r1.266 copy.c
*** src/backend/commands/copy.c 26 May 2006 22:50:02 -0000 1.266
--- src/backend/commands/copy.c 31 May 2006 08:52:42 -0000
***************
*** 47,53 ****
#include "utils/memutils.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
!
#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
--- 47,53 ----
#include "utils/memutils.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
! #include "utils/pg_lzcompress.h"
#define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
#define OCTVALUE(c) ((c) - '0')
***************
*** 103,114 ****
--- 103,121 ----
int client_encoding; /* remote side's character encoding */
bool need_transcoding; /* client encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+ bool do_compress; /* compress data before writing to output */
+ bool do_flush; /* flush fe_msgbuf to copy target file/pipe */
+ bool use_raw_buf; /* use raw buffered data for CopyGetData */
uint64 processed; /* # of tuples processed */
+ uint64 progress; /* progress notice each # tuples processed */
+
+ MemoryContext oldcontext;
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
List *attnumlist; /* integer list of attnums to copy */
bool binary; /* binary format? */
+ bool compression; /* binary compressed format? */
bool oids; /* include OIDs? */
bool csv_mode; /* Comma Separated Value format? */
bool header_line; /* CSV header line? */
***************
*** 153,162 ****
* converts it. Note: we guarantee that there is a \0 at
* raw_buf[raw_buf_len].
*/
! #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
} CopyStateData;
typedef CopyStateData *CopyState;
--- 160,170 ----
* converts it. Note: we guarantee that there is a \0 at
* raw_buf[raw_buf_len].
*/
! #define RAW_BUF_SIZE 65536 /* initially, we palloc RAW_BUF_SIZE+1 bytes */
char *raw_buf;
int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */
+ int raw_buf_size; /* actual raw_buf_size */
} CopyStateData;
typedef CopyStateData *CopyState;
***************
*** 260,265 ****
--- 268,276 ----
static void CopySendEndOfRow(CopyState cstate);
static int CopyGetData(CopyState cstate, void *databuf,
int minread, int maxread);
+ static bool CopyLoadRawBuf(CopyState cstate);
+ static int CopyLoadBuf(CopyState cstate, void *databuf,
+ int minread, int maxread);
static void CopySendInt32(CopyState cstate, int32 val);
static bool CopyGetInt32(CopyState cstate, int32 *val);
static void CopySendInt16(CopyState cstate, int16 val);
***************
*** 409,442 ****
static void
CopySendEndOfRow(CopyState cstate)
{
StringInfo fe_msgbuf = cstate->fe_msgbuf;
! switch (cstate->copy_dest)
{
! case COPY_FILE:
! if (!cstate->binary)
! {
! /* Default line termination depends on platform */
! #ifndef WIN32
! CopySendChar(cstate, '\n');
#else
! CopySendString(cstate, "\r\n");
#endif
! }
! (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
! 1, cstate->copy_file);
if (ferror(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to COPY file: %m")));
break;
case COPY_OLD_FE:
! /* The FE/BE protocol uses \n as newline for all platforms */
! if (!cstate->binary)
! CopySendChar(cstate, '\n');
!
! if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
{
/* no hope of recovering connection sync, so FATAL */
ereport(FATAL,
--- 420,497 ----
static void
CopySendEndOfRow(CopyState cstate)
{
+ PGLZ_Header *tmp=0;
StringInfo fe_msgbuf = cstate->fe_msgbuf;
+
+ void *data;
+ int len;
+ bool writeUncompressed = false;
! if (!cstate->binary)
{
! /* Default line termination depends on platform */
! #ifdef WIN32
! if (cstate->copy_dest == COPY_FILE)
! CopySendString(cstate, "\r\n");
! else
#else
! /* The FE/BE protocol uses \n as newline for all platforms */
! CopySendChar(cstate, '\n');
#endif
! }
!
!
! if (cstate->do_compress)
! {
! if (!cstate->do_flush && fe_msgbuf->len < RAW_BUF_SIZE)
! {
! /* Wait for some more data until we compress and write out */
! return;
! }
!
! tmp = (PGLZ_Header *) palloc(PGLZ_MAX_OUTPUT(fe_msgbuf->len));
!
! #if 1
! (void) pglz_compress(fe_msgbuf->data, fe_msgbuf->len, tmp, NULL);
! #else /* simulate non-compressible data, test compression performance */
! tmp->varsize = fe_msgbuf->len + sizeof(PGLZ_Header);
! tmp->rawsize = fe_msgbuf->len;
! #endif
! data = tmp;
!
! if (PGLZ_IS_COMPRESSED(tmp))
! len = tmp->varsize;
! else
! {
! // incompressible data
! len = sizeof(PGLZ_Header);
! writeUncompressed = true;
! }
! }
! else
! {
! data = fe_msgbuf->data;
! len = fe_msgbuf->len;
! }
! switch (cstate->copy_dest)
! {
! case COPY_FILE:
! (void) fwrite(data, len, 1, cstate->copy_file);
! if (ferror(cstate->copy_file))
! ereport(ERROR,
! (errcode_for_file_access(),
! errmsg("could not write to COPY file: %m")));
! if (writeUncompressed)
! (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file);
if (ferror(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to COPY file: %m")));
break;
case COPY_OLD_FE:
! if (pq_putbytes(data, len) ||
! (writeUncompressed && pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)))
{
/* no hope of recovering connection sync, so FATAL */
ereport(FATAL,
***************
*** 445,459 ****
}
break;
case COPY_NEW_FE:
- /* The FE/BE protocol uses \n as newline for all platforms */
- if (!cstate->binary)
- CopySendChar(cstate, '\n');
-
/* Dump the accumulated row as one CopyData message */
! (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
}
/* Reset fe_msgbuf to empty */
fe_msgbuf->len = 0;
fe_msgbuf->data[0] = '\0';
--- 500,515 ----
}
break;
case COPY_NEW_FE:
/* Dump the accumulated row as one CopyData message */
! (void) pq_putmessage('d', data, len);
! if (writeUncompressed)
! (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
}
+ if (tmp)
+ pfree(tmp);
+
/* Reset fe_msgbuf to empty */
fe_msgbuf->len = 0;
fe_msgbuf->data[0] = '\0';
***************
*** 475,480 ****
--- 531,564 ----
static int
CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
{
+ if (cstate->use_raw_buf)
+ {
+ int bytesread = 0;
+
+ while (bytesread < minread)
+ {
+ int nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
+ if (nbytes > maxread-bytesread)
+ nbytes = maxread-bytesread;
+
+ memcpy((char*)databuf + bytesread, cstate->raw_buf + cstate->raw_buf_index, nbytes);
+ cstate->raw_buf_index += nbytes;
+ bytesread += nbytes;
+
+ if (bytesread >= minread || !CopyLoadRawBuf(cstate))
+ break;
+ }
+
+ return bytesread;
+ }
+ else
+ return CopyLoadBuf(cstate, databuf, minread, maxread);
+ }
+
+
+ static int
+ CopyLoadBuf(CopyState cstate, void *databuf, int minread, int maxread)
+ {
int bytesread = 0;
switch (cstate->copy_dest)
***************
*** 662,669 ****
else
nbytes = 0; /* no data need be saved */
! inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
! 1, RAW_BUF_SIZE - nbytes);
nbytes += inbytes;
cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0;
--- 746,816 ----
else
nbytes = 0; /* no data need be saved */
! if (cstate->do_compress)
! {
! PGLZ_Header pglzHdr;
! inbytes = CopyLoadBuf(cstate, &pglzHdr, sizeof(PGLZ_Header), sizeof(PGLZ_Header));
!
! if (inbytes != sizeof(PGLZ_Header))
! {
! ereport(ERROR,
! (errcode(ERRCODE_CONNECTION_FAILURE),
! errmsg("not enough data")));
! }
! /* make sure raw_buf is big enough */
! if (cstate->raw_buf_size < pglzHdr.rawsize + nbytes)
! {
! char *newbuf;
! MemoryContext rowContext;
!
! cstate->raw_buf_size = pglzHdr.rawsize + nbytes;
!
! /* raw_buf is allocated statement-wide */
! rowContext = MemoryContextSwitchTo(cstate->oldcontext);
! newbuf=palloc(cstate->raw_buf_size+1);
! MemoryContextSwitchTo(rowContext);
!
! if (nbytes > 0)
! memcpy(newbuf, cstate->raw_buf, nbytes);
!
! pfree(cstate->raw_buf);
! cstate->raw_buf = newbuf;
! }
!
! if (PGLZ_IS_COMPRESSED(&pglzHdr))
! {
! PGLZ_Header *tmp = (PGLZ_Header*)palloc(pglzHdr.varsize);
! memcpy(tmp, &pglzHdr, sizeof(PGLZ_Header));
!
! inbytes = CopyLoadBuf(cstate, (char*)tmp + sizeof(PGLZ_Header),
! pglzHdr.varsize - sizeof(PGLZ_Header), pglzHdr.varsize - sizeof(PGLZ_Header));
! if (inbytes != pglzHdr.varsize-sizeof(PGLZ_Header))
! {
! ereport(ERROR,
! (errcode(ERRCODE_CONNECTION_FAILURE),
! errmsg("not enough data")));
! }
! pglz_decompress(tmp, cstate->raw_buf + nbytes);
! inbytes = pglzHdr.rawsize;
! }
! else
! {
! /* not compressed */
! inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes,
! pglzHdr.rawsize, pglzHdr.rawsize);
! if (inbytes != pglzHdr.rawsize)
! {
! ereport(ERROR,
! (errcode(ERRCODE_CONNECTION_FAILURE),
! errmsg("not enough data")));
! }
! }
! }
! else
! {
! inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes,
! cstate->raw_buf_size - nbytes, cstate->raw_buf_size - nbytes);
! }
nbytes += inbytes;
cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0;
***************
*** 733,738 ****
--- 880,902 ----
errmsg("conflicting or redundant options")));
cstate->binary = intVal(defel->arg);
}
+ else if (strcmp(defel->defname, "compression") == 0)
+ {
+ if (cstate->compression)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ cstate->compression = intVal(defel->arg);
+ cstate->binary = intVal(defel->arg);
+ }
+ else if (strcmp(defel->defname, "progress") == 0)
+ {
+ if (cstate->progress)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ cstate->progress = intVal(defel->arg);
+ }
else if (strcmp(defel->defname, "oids") == 0)
{
if (cstate->oids)
***************
*** 1009,1015 ****
initStringInfo(&cstate->attribute_buf);
initStringInfo(&cstate->line_buf);
cstate->line_buf_converted = false;
! cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->processed = 0;
--- 1173,1180 ----
initStringInfo(&cstate->attribute_buf);
initStringInfo(&cstate->line_buf);
cstate->line_buf_converted = false;
! cstate->raw_buf_size=RAW_BUF_SIZE;
! cstate->raw_buf = (char *) palloc(cstate->raw_buf_size+1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->processed = 0;
***************
*** 1274,1287 ****
--- 1439,1462 ----
/* Signature */
CopySendData(cstate, (char *) BinarySignature, 11);
+
/* Flags field */
tmp = 0;
if (cstate->oids)
tmp |= (1 << 16);
+ if (cstate->compression)
+ tmp |= (2 << 16);
CopySendInt32(cstate, tmp);
/* No header extension */
tmp = 0;
CopySendInt32(cstate, tmp);
+
+ if (cstate->compression)
+ {
+ CopySendEndOfRow(cstate);
+ /* from now on, rows will be compressed */
+ cstate->do_compress = true;
+ }
}
else
{
***************
*** 1404,1413 ****
}
CopySendEndOfRow(cstate);
-
MemoryContextSwitchTo(oldcontext);
!
cstate->processed++;
}
heap_endscan(scandesc);
--- 1579,1589 ----
}
CopySendEndOfRow(cstate);
MemoryContextSwitchTo(oldcontext);
!
cstate->processed++;
+ if (cstate->progress && (cstate->processed % cstate->progress) == 0)
+ ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed)));
}
heap_endscan(scandesc);
***************
*** 1417,1422 ****
--- 1593,1599 ----
/* Generate trailer for a binary copy */
CopySendInt16(cstate, -1);
/* Need to flush out the trailer */
+ cstate->do_flush = true;
CopySendEndOfRow(cstate);
}
***************
*** 1563,1570 ****
int *defmap;
ExprState **defexprs; /* array of default att expressions */
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
- MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
--- 1740,1747 ----
int *defmap;
ExprState **defexprs; /* array of default att expressions */
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
ErrorContextCallback errcontext;
+ cstate->oldcontext = CurrentMemoryContext;
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
***************
*** 1677,1683 ****
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (missing flags)")));
file_has_oids = (tmp & (1 << 16)) != 0;
! tmp &= ~(1 << 16);
if ((tmp >> 16) != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
--- 1854,1861 ----
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (missing flags)")));
file_has_oids = (tmp & (1 << 16)) != 0;
! cstate->compression = (tmp & (2 << 16)) != 0;
! tmp &= ~(3 << 16);
if ((tmp >> 16) != 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
***************
*** 1696,1701 ****
--- 1874,1883 ----
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (wrong length)")));
}
+
+ if (cstate->compression)
+ cstate->do_compress = true;
+ cstate->use_raw_buf = true;
}
if (file_has_oids && cstate->binary)
***************
*** 1913,1919 ****
HeapTupleSetOid(tuple, loaded_oid);
/* Triggers and stuff need to be invoked in query context. */
! MemoryContextSwitchTo(oldcontext);
skip_tuple = false;
--- 2095,2101 ----
HeapTupleSetOid(tuple, loaded_oid);
/* Triggers and stuff need to be invoked in query context. */
! MemoryContextSwitchTo(cstate->oldcontext);
skip_tuple = false;
***************
*** 1958,1970 ****
* tuples inserted by an INSERT command.
*/
cstate->processed++;
}
}
/* Done, clean up */
error_context_stack = errcontext.previous;
! MemoryContextSwitchTo(oldcontext);
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, resultRelInfo);
--- 2140,2159 ----
* tuples inserted by an INSERT command.
*/
cstate->processed++;
+
+ if (cstate->progress && (cstate->processed % cstate->progress) == 0)
+ {
+ error_context_stack = errcontext.previous;
+ ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed)));
+ error_context_stack = &errcontext;
+ }
}
}
/* Done, clean up */
error_context_stack = errcontext.previous;
! MemoryContextSwitchTo(cstate->oldcontext);
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, resultRelInfo);
Index: src/backend/parser/gram.y
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.545
diff -c -r2.545 gram.y
*** src/backend/parser/gram.y 27 May 2006 17:38:45 -0000 2.545
--- src/backend/parser/gram.y 31 May 2006 08:52:56 -0000
***************
*** 366,372 ****
CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P
CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
! COMMITTED CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB
CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME
CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
--- 366,372 ----
CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P
CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
! COMMITTED COMPRESSION CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB
CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME
CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
***************
*** 408,414 ****
PARTIAL PASSWORD PLACING POSITION
PRECISION PRESERVE PREPARE PREPARED PRIMARY
! PRIOR PRIVILEGES PROCEDURAL PROCEDURE
QUOTE
--- 408,414 ----
PARTIAL PASSWORD PLACING POSITION
PRECISION PRESERVE PREPARE PREPARED PRIMARY
! PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRESS
QUOTE
***************
*** 1649,1654 ****
--- 1649,1662 ----
{
$$ = makeDefElem("binary", (Node *)makeInteger(TRUE));
}
+ | COMPRESSION
+ {
+ $$ = makeDefElem("compression", (Node *)makeInteger(TRUE));
+ }
+ | PROGRESS opt_as Iconst
+ {
+ $$ = makeDefElem("progress", (Node *)makeInteger($3));
+ }
| OIDS
{
$$ = makeDefElem("oids", (Node *)makeInteger(TRUE));
***************
*** 8369,8374 ****
--- 8377,8383 ----
| COMMENT
| COMMIT
| COMMITTED
+ | COMPRESSION
| CONNECTION
| CONSTRAINTS
| CONVERSION_P
***************
*** 8476,8481 ****
--- 8485,8491 ----
| PRIVILEGES
| PROCEDURAL
| PROCEDURE
+ | PROGRESS
| QUOTE
| READ
| REASSIGN
Index: src/backend/parser/keywords.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/keywords.c,v
retrieving revision 1.171
diff -c -r1.171 keywords.c
*** src/backend/parser/keywords.c 5 Mar 2006 15:58:32 -0000 1.171
--- src/backend/parser/keywords.c 31 May 2006 08:52:57 -0000
***************
*** 83,88 ****
--- 83,89 ----
{"comment", COMMENT},
{"commit", COMMIT},
{"committed", COMMITTED},
+ {"compression", COMPRESSION},
{"connection", CONNECTION},
{"constraint", CONSTRAINT},
{"constraints", CONSTRAINTS},
***************
*** 267,272 ****
--- 268,274 ----
{"privileges", PRIVILEGES},
{"procedural", PROCEDURAL},
{"procedure", PROCEDURE},
+ {"progress", PROGRESS},
{"quote", QUOTE},
{"read", READ},
{"real", REAL},
Index: src/bin/psql/copy.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/bin/psql/copy.c,v
retrieving revision 1.61
diff -c -r1.61 copy.c
*** src/bin/psql/copy.c 26 May 2006 19:51:29 -0000 1.61
--- src/bin/psql/copy.c 31 May 2006 08:53:00 -0000
***************
*** 62,70 ****
--- 62,72 ----
bool psql_inout; /* true = use psql stdin/stdout */
bool from;
bool binary;
+ bool compression;
bool oids;
bool csv_mode;
bool header;
+ long progress;
char *delim;
char *null;
char *quote;
***************
*** 139,145 ****
}
result->table = pg_strdup(token);
-
token = strtokx(NULL, whitespace, ".,()", "\"",
0, false, pset.encoding);
if (!token)
--- 141,146 ----
***************
*** 283,288 ****
--- 284,297 ----
result->oids = true;
else if (pg_strcasecmp(token, "binary") == 0)
result->binary = true;
+ else if (pg_strcasecmp(token, "compression") == 0)
+ result->compression = true;
+ else if (pg_strcasecmp(token, "progress") == 0)
+ {
+ token = strtokx(NULL, whitespace, NULL, NULL,
+ 0, false, pset.encoding);
+ result->progress = atol(token);
+ }
else if (pg_strcasecmp(token, "csv") == 0)
result->csv_mode = true;
else if (pg_strcasecmp(token, "header") == 0)
***************
*** 478,483 ****
--- 487,497 ----
appendPQExpBuffer(&query, " WITH NULL AS '%s'", options->null);
}
+ if (options->compression)
+ appendPQExpBuffer(&query, " COMPRESSION");
+ if (options->progress)
+ appendPQExpBuffer(&query, " PROGRESS %ld", options->progress);
+
if (options->csv_mode)
appendPQExpBuffer(&query, " CSV");
Index: src/include/utils/pg_lzcompress.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/utils/pg_lzcompress.h,v
retrieving revision 1.11
diff -c -r1.11 pg_lzcompress.h
*** src/include/utils/pg_lzcompress.h 25 May 2005 21:40:42 -0000 1.11
--- src/include/utils/pg_lzcompress.h 31 May 2006 08:53:02 -0000
***************
*** 56,62 ****
* ----------
*/
#define PGLZ_IS_COMPRESSED(_lzdata) ((_lzdata)->varsize != \
! e (_lzdata)->rawsize + e \
sizeof(PGLZ_Header))
/* ----------
--- 56,62 ----
* ----------
*/
#define PGLZ_IS_COMPRESSED(_lzdata) ((_lzdata)->varsize != \
! (_lzdata)->rawsize + \
sizeof(PGLZ_Header))
/* ----------