Re: Error on PQputline() - Mailing list pgsql-hackers

From Dann Corbit
Subject Re: Error on PQputline()
Date
Msg-id D90A5A6C612A39408103E6ECDD77B82920CE77@voyager.corporate.connx.com
Whole thread Raw
In response to Error on PQputline()  ("Dann Corbit" <DCorbit@connx.com>)
Responses Re: Error on PQputline()
List pgsql-hackers
> -----Original Message-----
> From: Tom Lane [mailto:tgl@sss.pgh.pa.us]
> Sent: Friday, May 17, 2002 4:10 PM
> To: Dann Corbit
> Cc: pgsql-hackers@postgresql.org
> Subject: Re: [HACKERS] Error on PQputline()
>
>
> "Dann Corbit" <DCorbit@connx.com> writes:
> > The contents of the error message are:
> > conn->errorMessage.data    0x00312440 "pqFlush() --
> couldn't send data:
> > errno=0
> > No error A non-blocking socket operation could not be completed
> > immediately.
>
> You're running libpq with the nonblocking mode selected?

Actually no.  It should be the default mode for a connection made by
PQconnectdb().  That's what made the error so puzzling.

> > What is the correct recovery action?
>
> Redesign libpq's nonblock mode :-(.  It's a mess; a quick hack that
> doesn't even try to cover all cases, and is unreliable in the ones it
> does cover.  You can find my previous rants on the subject in the
> archives from a couple years back (around Jan '00 I believe).  IMHO
> we should never have accepted that patch at all.
>
> Short of that, don't use the COPY code with nonblock.

I am trying to figure out if it is faster to bulk copy from a file on
the server or using an API from the client.  It boils down to this:

"Would it be faster to write a file to disk and read it again on the
local host for the server or to send the calls via libpq client
messages?"

It could be that the TCP/IP overhead exceeds the overhead of writing the
file to disk and reading it again.

I have a data statement (in test.h) that consists of 1.6 million rows of
data to spin into the database.

Here is the complete program:

#include <windows.h>
#include <stdlib.h>
#include <time.h>
#include "libpq-fe.h"
#include "glob.h"               /* member variables in the objects */

#include "test.h"

int             init_comm(void)
{   WORD            wVersionRequested;   WSADATA         wsaData;   int             err;
   wVersionRequested = MAKEWORD(2, 2);
   err = WSAStartup(wVersionRequested, &wsaData);   if (err != 0) {       /* Tell the user that we could not find a
usable*/       /* WinSock DLL.                                  */       return 0;   }   return 1; 
}

void            ProcessTuples(void);

int             ExecuteImmediate(char *command, Qtype q_t)
{   int             problem = 0;
#ifdef _DEBUG   printf("%s\n", command);
#endif   result = PQexec(conn, command);   switch (rc = PQresultStatus(result)) {
       /* We should never actually call this.  Left in for debugging...
*/       /* All tuple processing is handled low-level to pass data back
to        * CONNX */
   case PGRES_TUPLES_OK:       /* Data set successfully created */
#ifdef _DEBUG       printf("#rows affected %s\n", PQcmdTuples(result));
#endif       ProcessTuples();       break;   case PGRES_EMPTY_QUERY:     /* Empty query supplied -- do nothing...
*/   case PGRES_COMMAND_OK:      /* Query succeeds, but returns no
results */       /* If we did a select, we should (at least) have a result set of        * empty tuples. */       if
(q_t== QUERY_TYPE_SELECT)           problem = 1;       break;   case PGRES_BAD_RESPONSE:   case PGRES_NONFATAL_ERROR:
casePGRES_FATAL_ERROR:       {           problem = 1;       }   }   if (q_t == QUERY_TYPE_INSERT) {       InsertedOID =
PQoidValue(result);
#ifdef _DEBUG       printf("OID of inserted row is %lu\n", (unsigned long)
InsertedOID);
#endif   }   PQclear(result);   return problem;
}

void            HandleProblem(void)
{   const char     *m1 = PQresStatus(rc);   const char     *m2 = PQresultErrorMessage(result);
#ifdef __cplusplus   String          err = m1;   err = err + m2;   throw Mcnew     CPOSTGRESQLException(conn, rc,
(LPCSTR)err, 
szSQLState);
#endif
#ifdef _DEBUG   printf("status is %s\n", m1);   printf("result message: %s\n", m2);
#endif
}

void            BeginTrans(void)
{   int             problem;   problem = ExecuteImmediate("BEGIN work", QUERY_TYPE_TRANSACT);   if (problem)
HandleProblem();
}

void            CommitTrans(void)
{   int             problem;
   problem = ExecuteImmediate("COMMIT work", QUERY_TYPE_TRANSACT);   if (problem)       HandleProblem();
}

void            RollbackTrans(void)
{   int             problem;
   problem = ExecuteImmediate("ROLLBACK work", QUERY_TYPE_TRANSACT);   if (problem)       HandleProblem();
}

void            ProcessTuples()
{   nrows = PQntuples(result);   nfields = PQnfields(result);
#ifdef _DEBUG   printf("number of rows returned = %d\n", nrows);   printf("number of fields returned = %d\n", nfields);
#endif   for (r = 0; r < nrows; r++) {       for (n = 0; n < nfields; n++)           printf(" %s = %s(%d),",
     PQfname(result, n),                  PQgetvalue(result, r, n),                  PQgetlength(result, r, n));
printf("\n");  } 
}

static long     cursor_number = 0;

int             main(void)
{   int             problem;   int             i = 0;
   struct tm      *newtime;   time_t          aclock;
   if (init_comm()) {       conn = PQconnectdb("dbname=connxdatasync host=dannfast");       if (PQstatus(conn) ==
CONNECTION_OK){           char            insert_sql[256];           printf("connection made\n");       } else {
  printf("connection failed\n");           return EXIT_FAILURE;       } 
       puts("DROP TABLE cnx_ds_sis_bill_detl_tb started");       problem = ExecuteImmediate("DROP TABLE
cnx_ds_sis_bill_detl_tb",
QUERY_TYPE_OTHER);       if (problem)           HandleProblem();
       puts("DROP TABLE cnx_ds_sis_bill_detl_tb finished");       puts("CREATE TABLE cnx_ds_sis_bill_detl_tb started");
     problem = ExecuteImmediate("CREATE TABLE cnx_ds_sis_bill_detl_tb 
( extr_stu_id char(10),  term_cyt char(5),  subcode char(5),  tran_seq
int2,  crc int8)", QUERY_TYPE_OTHER);       if (problem)           HandleProblem();
       puts("CREATE TABLE cnx_ds_sis_bill_detl_tb finished");       puts("going to start bulk copy...");

       time(&aclock);       newtime = localtime(&aclock);       puts(asctime(newtime));       result = PQexec(conn,
"COPYcnx_ds_sis_bill_detl_tb FROM STDIN 
DELIMITERS '|'");       problem = 0;       switch (rc = PQresultStatus(result)) {       case PGRES_BAD_RESPONSE:
casePGRES_NONFATAL_ERROR:       case PGRES_FATAL_ERROR:           {               problem = 1;           }       } 
       if (problem)           HandleProblem();
       puts("done with initialization...");
       while (pszBCPdata[i])    {         if (PQputline(conn, pszBCPdata[i++]) == EOF)          printf("Error inserting
dataon row %d\n", 
i-1);    }
       PQputline(conn, "\\.\n");
       PQendcopy(conn);
       puts("finished with bulk copy...");
       time(&aclock);       newtime = localtime(&aclock);       puts(asctime(newtime));
       return EXIT_SUCCESS;   }   puts("initialization of winsock failed.");   return EXIT_FAILURE;
}


pgsql-hackers by date:

Previous
From: Tom Lane
Date:
Subject: Re: Error on PQputline()
Next
From: Tom Lane
Date:
Subject: Re: Updated CREATE FUNCTION syntax