Volkan YAZICI wrote:
> Here's a new try. This one touches to pg_dump side too - for v3
> COPY functions usage instead of deprecated ones.
>
> I'd be so appreciated if somebody can review attached patch.
I have updated the patch to match CVS (attached), but am seeing the
following regression differences where the COPY error messages are now
missing. Any idea what is causing that?
--
Bruce Momjian | http://candle.pha.pa.us
pgman@candle.pha.pa.us | (610) 359-1001
+ If your life is a hard drive, | 13 Roberts Road
+ Christ can be your backup. | Newtown Square, Pennsylvania 19073
Index: src/backend/commands/copy.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.258
diff -c -c -r1.258 copy.c
*** src/backend/commands/copy.c 3 Feb 2006 12:41:07 -0000 1.258
--- src/backend/commands/copy.c 12 Feb 2006 04:22:16 -0000
***************
*** 102,107 ****
--- 102,108 ----
int client_encoding; /* remote side's character encoding */
bool need_transcoding; /* client encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+ uint64 processed; /* # of tuples processed */
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
***************
*** 710,716 ****
* Do not allow the copy if user doesn't have proper permission to access
* the table.
*/
! void
DoCopy(const CopyStmt *stmt)
{
CopyState cstate;
--- 711,717 ----
* Do not allow the copy if user doesn't have proper permission to access
* the table.
*/
! uint64
DoCopy(const CopyStmt *stmt)
{
CopyState cstate;
***************
*** 724,729 ****
--- 725,731 ----
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
AclResult aclresult;
ListCell *option;
+ uint64 processed;
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
***************
*** 1019,1024 ****
--- 1021,1027 ----
cstate->line_buf_converted = false;
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
+ cstate->processed = 0;
/* Set up encoding conversion info */
cstate->client_encoding = pg_get_client_encoding();
***************
*** 1164,1170 ****
--- 1167,1176 ----
pfree(cstate->attribute_buf.data);
pfree(cstate->line_buf.data);
pfree(cstate->raw_buf);
+
+ processed = cstate->processed;
pfree(cstate);
+ return processed;
}
***************
*** 1396,1401 ****
--- 1402,1409 ----
VARSIZE(outputbytes) - VARHDRSZ);
}
}
+
+ cstate->processed++;
}
CopySendEndOfRow(cstate);
***************
*** 2002,2007 ****
--- 2010,2017 ----
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple);
+
+ cstate->processed++;
}
}
Index: src/backend/tcop/utility.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/tcop/utility.c,v
retrieving revision 1.251
diff -c -c -r1.251 utility.c
*** src/backend/tcop/utility.c 11 Feb 2006 22:17:19 -0000 1.251
--- src/backend/tcop/utility.c 12 Feb 2006 04:22:17 -0000
***************
*** 640,646 ****
break;
case T_CopyStmt:
! DoCopy((CopyStmt *) parsetree);
break;
case T_PrepareStmt:
--- 640,651 ----
break;
case T_CopyStmt:
! {
! uint64 processed = DoCopy((CopyStmt *) parsetree);
!
! snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
! "COPY " UINT64_FORMAT, processed);
! }
break;
case T_PrepareStmt:
Index: src/bin/pg_dump/pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.68
diff -c -c -r1.68 pg_backup_db.c
*** src/bin/pg_dump/pg_backup_db.c 9 Feb 2006 18:28:29 -0000 1.68
--- src/bin/pg_dump/pg_backup_db.c 12 Feb 2006 04:22:17 -0000
***************
*** 388,393 ****
--- 388,395 ----
* enter COPY mode; this allows us to behave reasonably when trying
* to continue after an error in a COPY command.
*/
+ if (AH->pgCopyIn &&
+ PQputCopyData(AH->connection, AH->pgCopyBuf->data, AH->pgCopyBuf->len) < 0)
if (AH->pgCopyIn && PQputline(AH->connection, AH->pgCopyBuf->data) != 0)
die_horribly(AH, modulename, "error returned by PQputline: %s",
PQerrorMessage(AH->connection));
***************
*** 400,407 ****
if (isEnd)
{
! if (AH->pgCopyIn && PQendcopy(AH->connection) != 0)
! die_horribly(AH, modulename, "error returned by PQendcopy: %s",
PQerrorMessage(AH->connection));
AH->pgCopyIn = false;
--- 402,409 ----
if (isEnd)
{
! if (AH->pgCopyIn && PQputCopyEnd(AH->connection, NULL) < 0)
! die_horribly(AH, modulename, "error returned by PQputCopyEnd: %s",
PQerrorMessage(AH->connection));
AH->pgCopyIn = false;
Index: src/bin/pg_dump/pg_dump.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v
retrieving revision 1.428
diff -c -c -r1.428 pg_dump.c
*** src/bin/pg_dump/pg_dump.c 12 Feb 2006 03:22:18 -0000 1.428
--- src/bin/pg_dump/pg_dump.c 12 Feb 2006 04:22:21 -0000
***************
*** 778,799 ****
* to be dumped.
*/
- #define COPYBUFSIZ 8192
-
static int
dumpTableData_copy(Archive *fout, void *dcontext)
{
! TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
! TableInfo *tbinfo = tdinfo->tdtable;
! const char *classname = tbinfo->dobj.name;
! const bool hasoids = tbinfo->hasoids;
! const bool oids = tdinfo->oids;
! PQExpBuffer q = createPQExpBuffer();
! PGresult *res;
! int ret;
! bool copydone;
! char copybuf[COPYBUFSIZ];
! const char *column_list;
if (g_verbose)
write_msg(NULL, "dumping contents of table %s\n", classname);
--- 778,796 ----
* to be dumped.
*/
static int
dumpTableData_copy(Archive *fout, void *dcontext)
{
! TableDataInfo *tdinfo = (TableDataInfo *) dcontext;
! TableInfo *tbinfo = tdinfo->tdtable;
! const char *classname = tbinfo->dobj.name;
! const bool hasoids = tbinfo->hasoids;
! const bool oids = tdinfo->oids;
! PQExpBuffer q = createPQExpBuffer();
! PGresult *res;
! int ret;
! char *copybuf;
! const char *column_list;
if (g_verbose)
write_msg(NULL, "dumping contents of table %s\n", classname);
***************
*** 834,867 ****
res = PQexec(g_conn, q->data);
check_sql_result(res, g_conn, q->data, PGRES_COPY_OUT);
! copydone = false;
!
! while (!copydone)
{
! ret = PQgetline(g_conn, copybuf, COPYBUFSIZ);
! if (copybuf[0] == '\\' &&
! copybuf[1] == '.' &&
! copybuf[2] == '\0')
! {
! copydone = true; /* don't print this... */
! }
! else
{
archputs(copybuf, fout);
! switch (ret)
! {
! case EOF:
! copydone = true;
! /* FALLTHROUGH */
! case 0:
! archputs("\n", fout);
! break;
! case 1:
! break;
! }
}
/*
* THROTTLE:
*
--- 831,851 ----
res = PQexec(g_conn, q->data);
check_sql_result(res, g_conn, q->data, PGRES_COPY_OUT);
! for (;;)
{
! ret = PQgetCopyData(g_conn, ©buf, 0);
! /* buffer is filled by libpq */
! if (ret > 0)
{
archputs(copybuf, fout);
! PQfreemem(copybuf);
}
+ /* copy completed (or an error occured) */
+ else if (ret < 0)
+ break;
+
/*
* THROTTLE:
*
***************
*** 904,913 ****
}
archprintf(fout, "\\.\n\n\n");
! ret = PQendcopy(g_conn);
! if (ret != 0)
{
! write_msg(NULL, "SQL command to dump the contents of table \"%s\" failed: PQendcopy() failed.\n", classname);
write_msg(NULL, "Error message from server: %s", PQerrorMessage(g_conn));
write_msg(NULL, "The command was: %s\n", q->data);
exit_nicely();
--- 888,897 ----
}
archprintf(fout, "\\.\n\n\n");
! /* copy failed */
! if (ret == -2)
{
! write_msg(NULL, "SQL command to dump the contents of table \"%s\" failed: PQgetCopyData() failed.\n",
classname);
write_msg(NULL, "Error message from server: %s", PQerrorMessage(g_conn));
write_msg(NULL, "The command was: %s\n", q->data);
exit_nicely();
Index: src/bin/psql/common.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/psql/common.c,v
retrieving revision 1.112
diff -c -c -r1.112 common.c
*** src/bin/psql/common.c 12 Feb 2006 03:30:21 -0000 1.112
--- src/bin/psql/common.c 12 Feb 2006 04:22:22 -0000
***************
*** 854,867 ****
* Returns true if the query executed successfully, false otherwise.
*/
static bool
! ProcessCopyResult(PGresult *results)
{
! bool success = false;
! if (!results)
return false;
! switch (PQresultStatus(results))
{
case PGRES_TUPLES_OK:
case PGRES_COMMAND_OK:
--- 854,869 ----
* Returns true if the query executed successfully, false otherwise.
*/
static bool
! ProcessCopyResult(PGresult **results)
{
! bool success, req_getres;
! if (!*results)
return false;
! success = req_getres = false;
!
! switch (PQresultStatus(*results))
{
case PGRES_TUPLES_OK:
case PGRES_COMMAND_OK:
***************
*** 871,881 ****
break;
case PGRES_COPY_OUT:
! success = handleCopyOut(pset.db, pset.queryFout);
break;
case PGRES_COPY_IN:
! success = handleCopyIn(pset.db, pset.cur_cmd_source);
break;
default:
--- 873,883 ----
break;
case PGRES_COPY_OUT:
! req_getres = success = handleCopyOut(pset.db, pset.queryFout);
break;
case PGRES_COPY_IN:
! req_getres = success = handleCopyIn(pset.db, pset.cur_cmd_source);
break;
default:
***************
*** 885,890 ****
--- 887,904 ----
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
return false;
+
+ /*
+ * We need a last PQgetResult() call to learn whether COPY
+ * succeded or not.
+ */
+ if (req_getres)
+ {
+ PQclear(*results);
+ *results = PQgetResult(pset.db);
+ if (PQresultStatus(*results) != PGRES_COMMAND_OK)
+ success = false;
+ }
return success;
}
***************
*** 1068,1074 ****
results = PQexec(pset.db, query);
/* these operations are included in the timing result: */
! OK = (AcceptResult(results, query) && ProcessCopyResult(results));
if (pset.timing)
GETTIMEOFDAY(&after);
--- 1082,1088 ----
results = PQexec(pset.db, query);
/* these operations are included in the timing result: */
! OK = (AcceptResult(results, query) && ProcessCopyResult(&results));
if (pset.timing)
GETTIMEOFDAY(&after);
Index: src/bin/psql/copy.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/psql/copy.c,v
retrieving revision 1.58
diff -c -c -r1.58 copy.c
*** src/bin/psql/copy.c 15 Oct 2005 02:49:40 -0000 1.58
--- src/bin/psql/copy.c 12 Feb 2006 04:22:22 -0000
***************
*** 585,656 ****
return success;
}
! #define COPYBUFSIZ 8192 /* size doesn't matter */
!
/*
* handleCopyOut
* receives data as a result of a COPY ... TO stdout command
- *
- * If you want to use COPY TO in your application, this is the code to steal :)
- *
- * conn should be a database connection that you just called COPY TO on
- * (and which gave you PGRES_COPY_OUT back);
- * copystream is the file stream you want the output to go to
*/
bool
handleCopyOut(PGconn *conn, FILE *copystream)
{
! bool copydone = false; /* haven't started yet */
! char copybuf[COPYBUFSIZ];
! int ret;
! while (!copydone)
{
! ret = PQgetline(conn, copybuf, COPYBUFSIZ);
! if (copybuf[0] == '\\' &&
! copybuf[1] == '.' &&
! copybuf[2] == '\0')
{
! copydone = true; /* we're at the end */
}
! else
{
! fputs(copybuf, copystream);
! switch (ret)
! {
! case EOF:
! copydone = true;
! /* FALLTHROUGH */
! case 0:
! fputc('\n', copystream);
! break;
! case 1:
! break;
! }
}
}
fflush(copystream);
- ret = !PQendcopy(conn);
ResetCancelConn();
! return ret;
}
-
-
/*
* handleCopyIn
* receives data as a result of a COPY ... FROM stdin command
- *
- * Again, if you want to use COPY FROM in your application, copy this.
- *
- * conn should be a database connection that you just called COPY FROM on
- * (and which gave you PGRES_COPY_IN back);
- * copystream is the file stream you want the input to come from
*/
-
bool
handleCopyIn(PGconn *conn, FILE *copystream)
{
--- 585,651 ----
return success;
}
+ /*
+ * Function routines for handling COPY IN/OUT input/output.
+ *
+ * If you want to use COPY TO in your application, this is the
+ * code to steal ;)
+ *
+ * conn should be a database connection that you just issued COPY FROM/TO
+ * and copystream is the file stream for input/output to read/go.
+ *
+ * PGRES_COPY_IN and PGRES_COPY_OUT results will be untouched, so you
+ * should make a PQgetResult() call at the end to learn whether COPY
+ * succeeded or not. (This will bring COPY command's commandTag string too.)
+ */
! /* chunk size will be used during COPY IN - size doesn't matter */
! #define COPYBUFSIZ 8192
/*
* handleCopyOut
* receives data as a result of a COPY ... TO stdout command
*/
bool
handleCopyOut(PGconn *conn, FILE *copystream)
{
! char *buf;
! int ret;
! bool res = true;
! for (;;)
{
! ret = PQgetCopyData(conn, &buf, 0);
! /* Buffer is filled by libpq */
! if (ret > 0)
{
! fputs(buf, copystream);
! PQfreemem(buf);
}
!
! /* copy done */
! else if (ret == -1)
! break;
!
! /* oops */
! else if (ret == -2)
{
! res = false;
! break;
}
}
+
fflush(copystream);
ResetCancelConn();
!
! return res;
}
/*
* handleCopyIn
* receives data as a result of a COPY ... FROM stdin command
*/
bool
handleCopyIn(PGconn *conn, FILE *copystream)
{
***************
*** 658,670 ****
bool copydone = false;
bool firstload;
bool linedone;
! bool saw_cr = false;
! char copybuf[COPYBUFSIZ];
char *s;
- int bufleft;
- int c = 0;
int ret;
- unsigned int linecount = 0;
/* Prompt if interactive input */
if (isatty(fileno(copystream)))
--- 653,661 ----
bool copydone = false;
bool firstload;
bool linedone;
! char buf[COPYBUFSIZ];
char *s;
int ret;
/* Prompt if interactive input */
if (isatty(fileno(copystream)))
***************
*** 684,747 ****
fputs(prompt, stdout);
fflush(stdout);
}
firstload = true;
linedone = false;
while (!linedone)
{ /* for each bufferload in line ... */
! /* Fetch string until \n, EOF, or buffer full */
! s = copybuf;
! for (bufleft = COPYBUFSIZ - 1; bufleft > 0; bufleft--)
{
! c = getc(copystream);
! if (c == EOF)
! {
! linedone = true;
! break;
! }
! *s++ = c;
! if (c == '\n')
{
! linedone = true;
! break;
}
! if (c == '\r')
! saw_cr = true;
! }
! *s = '\0';
! /* EOF with empty line-so-far? */
! if (c == EOF && s == copybuf && firstload)
! {
! /*
! * We are guessing a little bit as to the right line-ending
! * here...
! */
! if (saw_cr)
! PQputline(conn, "\\.\r\n");
! else
! PQputline(conn, "\\.\n");
copydone = true;
- if (pset.cur_cmd_interactive)
- puts("\\.");
break;
}
! /* No, so pass the data to the backend */
! PQputline(conn, copybuf);
! /* Check for line consisting only of \. */
if (firstload)
{
! if (strcmp(copybuf, "\\.\n") == 0 ||
! strcmp(copybuf, "\\.\r\n") == 0)
{
copydone = true;
break;
}
firstload = false;
}
}
! linecount++;
}
! ret = !PQendcopy(conn);
! pset.lineno += linecount;
! return ret;
}
--- 675,725 ----
fputs(prompt, stdout);
fflush(stdout);
}
+
firstload = true;
linedone = false;
while (!linedone)
{ /* for each bufferload in line ... */
! s = fgets(buf, COPYBUFSIZ, copystream);
! if (!s)
{
! if (ferror(copystream))
{
! (void) PQputCopyEnd(conn, "Due to fgets() failure!");
! return false;
}
!
copydone = true;
break;
}
!
! /* Locating EOF. (Required for below and buf length.) */
! s = memchr(buf, '\0', COPYBUFSIZ);
!
! /* current line is done. */
! if (*(s-1) == '\n')
! linedone = true;
!
if (firstload)
{
! if (strcmp(buf, "\\.\n") == 0 ||
! strcmp(buf, "\\.\r\n") == 0)
{
copydone = true;
break;
}
+
firstload = false;
}
+
+ ret = PQputCopyData(conn, buf, (s-buf));
+ if (ret < 0)
+ return false;
}
!
! pset.lineno++;
}
!
! return PQputCopyEnd(conn, NULL) < 0 ? false : true;
}
Index: src/include/commands/copy.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/commands/copy.h,v
retrieving revision 1.25
diff -c -c -r1.25 copy.h
*** src/include/commands/copy.h 31 Dec 2004 22:03:28 -0000 1.25
--- src/include/commands/copy.h 12 Feb 2006 04:22:22 -0000
***************
*** 16,22 ****
#include "nodes/parsenodes.h"
!
! extern void DoCopy(const CopyStmt *stmt);
#endif /* COPY_H */
--- 16,21 ----
#include "nodes/parsenodes.h"
! extern uint64 DoCopy(const CopyStmt *stmt);
#endif /* COPY_H */
Index: src/interfaces/libpq/fe-exec.c
===================================================================
RCS file: /cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v
retrieving revision 1.179
diff -c -c -r1.179 fe-exec.c
*** src/interfaces/libpq/fe-exec.c 25 Jan 2006 20:44:32 -0000 1.179
--- src/interfaces/libpq/fe-exec.c 12 Feb 2006 04:22:25 -0000
***************
*** 2177,2183 ****
char *
PQcmdTuples(PGresult *res)
{
! char *p;
if (!res)
return "";
--- 2177,2183 ----
char *
PQcmdTuples(PGresult *res)
{
! char *p, *c;
if (!res)
return "";
***************
*** 2195,2201 ****
p = res->cmdStatus + 6;
else if (strncmp(res->cmdStatus, "FETCH ", 6) == 0)
p = res->cmdStatus + 5;
! else if (strncmp(res->cmdStatus, "MOVE ", 5) == 0)
p = res->cmdStatus + 4;
else
return "";
--- 2195,2202 ----
p = res->cmdStatus + 6;
else if (strncmp(res->cmdStatus, "FETCH ", 6) == 0)
p = res->cmdStatus + 5;
! else if (strncmp(res->cmdStatus, "MOVE ", 5) == 0 ||
! strncmp(res->cmdStatus, "COPY ", 5) == 0)
p = res->cmdStatus + 4;
else
return "";
***************
*** 2203,2216 ****
p++;
if (*p == 0)
! {
! pqInternalNotice(&res->noticeHooks,
! "could not interpret result from server: %s",
! res->cmdStatus);
! return "";
! }
!
! return p;
}
/*
--- 2204,2222 ----
p++;
if (*p == 0)
! goto interpret_error;
!
! /* check if we have an int */
! for (c = p; isdigit((unsigned char) *c); ++c)
! ;
! if (*c == 0)
! return p;
!
! interpret_error:
! pqInternalNotice(&res->noticeHooks,
! "could not interpret result from server: %s",
! res->cmdStatus);
! return "";
}
/*
*** ./expected/copy2.out Wed Dec 28 10:04:33 2005
--- ./results/copy2.out Sat Feb 11 23:21:12 2006
***************
*** 34,51 ****
ERROR: column "d" specified more than once
-- missing data: should fail
COPY x from stdin;
- ERROR: invalid input syntax for integer: ""
- CONTEXT: COPY x, line 1, column a: ""
COPY x from stdin;
- ERROR: missing data for column "e"
- CONTEXT: COPY x, line 1: "2000 230 23 23"
COPY x from stdin;
- ERROR: missing data for column "e"
- CONTEXT: COPY x, line 1: "2001 231 \N \N"
-- extra data: should fail
COPY x from stdin;
- ERROR: extra data after last expected column
- CONTEXT: COPY x, line 1: "2002 232 40 50 60 70 80"
-- various COPY options: delimiters, oids, NULL string
COPY x (b, c, d, e) from stdin with oids delimiter ',' null 'x';
COPY x from stdin WITH DELIMITER AS ';' NULL AS '';
--- 34,43 ----
======================================================================
*** ./expected/domain.out Mon Jan 16 13:12:32 2006
--- ./results/domain.out Sat Feb 11 23:21:13 2006
***************
*** 39,46 ****
INSERT INTO basictest values ('88', 'haha', 'short', '123.1212'); -- Truncate numeric
-- Test copy
COPY basictest (testvarchar) FROM stdin; -- fail
- ERROR: value too long for type character varying(5)
- CONTEXT: COPY basictest, line 1: "notsoshorttext"
COPY basictest (testvarchar) FROM stdin;
select * from basictest;
testint4 | testtext | testvarchar | testnumeric
--- 39,44 ----
***************
*** 126,137 ****
INSERT INTO nulltest values ('a', 'b', 'c', NULL, 'd'); -- Good
-- Test copy
COPY nulltest FROM stdin; --fail
- ERROR: domain dcheck does not allow null values
- CONTEXT: COPY nulltest, line 1: "a b \N d \N"
-- Last row is bad
COPY nulltest FROM stdin;
- ERROR: new row for relation "nulltest" violates check constraint "nulltest_col5_check"
- CONTEXT: COPY nulltest, line 3: "a b c \N a"
select * from nulltest;
col1 | col2 | col3 | col4 | col5
------+------+------+------+------
--- 124,131 ----
======================================================================
*** ./expected/alter_table.out Sat Feb 11 16:58:36 2006
--- ./results/alter_table.out Sat Feb 11 23:21:14 2006
***************
*** 867,874 ****
copy test("........pg.dropped.1........") to stdout;
ERROR: column "........pg.dropped.1........" of relation "test" does not exist
copy test from stdin;
- ERROR: extra data after last expected column
- CONTEXT: COPY test, line 1: "10 11 12"
select * from test;
b | c
---+---
--- 867,872 ----
======================================================================