Re: code to perform COPY - Mailing list pgsql-jdbc

From Dave Cramer
Subject Re: code to perform COPY
Date
Msg-id 1025311928.29617.324.camel@inspiron.cramers
Whole thread Raw
In response to code to perform COPY  (Michael Adler <adler@glimpser.org>)
List pgsql-jdbc
Michael,

Any chance you can submit this as a context diff

cvs diff -c

Dave
On Fri, 2002-06-28 at 16:48, Michael Adler wrote:
>
> Here's the goods. This has worked pretty well for me. I think it's ready
> for the next stage of scrutiny.
>
> We may want to take a slightly different approach and move some of this
> logic to PG_Stream. There's alot of code overhead on sending down the
> transaction-control queries (BEGIN and COMMIT) and reading the response
> analyses. I may not be taking advantage of some method, or we may want to
> add functionality elsewhere to support copy in a clean fashion.
>
> The debug messages are very verbose, so we'll probably want to comment out
> some of that. For large copies, it prints out a crude KB per second. I get
> 220KB/sec over my LAN. It appears to be CPU bound on my P4 1.6ghz, so
> perhaps that can be improved to the point of being network-bound.
>
> Like I mentioned in my previous email, since PG_Stream.input_stream is an
> java.io.InputStream, the multi-byte read/receive method offers no
> improvments over the single-byte method. I could very well be wrong,
> though. I still haven't tested it over a high-latency connection.
>
>
>     // *****************
>     // Postgres COPY handling from org.postgresql.Connection
>     // *****************
>
>     /**
>      *  This will take the name of a table and an OutputStream, construct a COPY OUT query, send the query
>      *  ( while bypassing QueryExecutor), direct the output of the backend into the OutputStream and
>      *  then check the backend to see if it's ready for another query.
>      * @param table the table from which it will copy
>      * @param out OutputStream to which the copy will be sent
>      * @return void
>      * @exception Exception if a database access error occurs
>      *
>      **/
>
>     public void copyOut(String table, OutputStream out) throws Exception
>     {
>         synchronized(pg_stream) {
>
>             String query = "COPY " + table + " TO STDOUT";
>             if (Driver.logDebug) Driver.debug("Sending query '" + query + "' to backend.");
>
>             // duplicates statements in QueryExecutor.sendQuery
>             pg_stream.SendChar('Q');
>             pg_stream.Send(this.getEncoding().encode( query ));
>             pg_stream.SendChar(0);
>             pg_stream.flush();
>
>             // check response from backend
>             int response = pg_stream.ReceiveChar();
>
>             if (response == 'H') {
>                 if (Driver.logDebug) Driver.debug("Received 'H' from backend. The COPY out is beginning.");
>             } else if ( response == 'E') {
>                 String error_string = pg_stream.ReceiveString(this.getEncoding());
>                 throw new SQLException( error_string );
>             } else {
>                 throw new SQLException("Copy Out should receive H from backend, but instead received: " +
(char)response); 
>             }
>
>             long start_time = System.currentTimeMillis();
>
>             int a, b, c;
>
>             // read input stream one char at a time, but always hold three
>             a = pg_stream.ReceiveChar();
>             b = pg_stream.ReceiveChar();
>             c = pg_stream.ReceiveChar();
>
>             int counter = 3;
>
>             while (true) {
>                 if ( Driver.logDebug && counter % 100000 == 0) {
>                     int rate = counter / (int)(System.currentTimeMillis() - start_time);
>                     System.out.println( (counter / 1000) + " KB total at " + rate + " KB/sec");
>                 }
>
>                 if ( a == '\\' && b == '.' &&  c == '\n' ) {
>                     // this sequence of bytes means the copy is over
>                     if (Driver.logDebug) Driver.debug("Received '\\.' from backend. The COPY out stream is
finished.");
>                     break;
>                 }
>
>                 out.write(a);
>
>                 a = b;
>                 b = c;
>
>                 try {
>                     // the following looks unoptimized, but is really the same as java.io.OutputStream.read(byte[] b,
intoff, int len) 
>                     c = pg_stream.ReceiveChar();
>                 } catch (Exception e) {
>                     // maybe the connection is screwed, or maybe we can salvage it. I don't know.
>                     throw e;
>                 }
>
>                 counter++;
>             }
>
>             // the backend should send the string "COPY"
>             String str = pg_stream.ReceiveString(this.getEncoding());
>
>             // check to make sure the backend is ready for the next query
>             response = pg_stream.ReceiveChar();
>
>             if (response == 'Z') {
>                 if (Driver.logDebug) Driver.debug("Received 'Z' from backend. It's ready for the next query. COPY out
hascompleted"); 
>             } else if ( response == 'E') {
>                 String error_string = pg_stream.ReceiveString(this.getEncoding());
>                 throw new SQLException( error_string );
>             } else {
>                 throw new SQLException("Copy should receive Z from backend, but instead received: " + (char)response
);
>             }
>         }
>
>         return;
>     }
>
>
>     /*
>      *  This will take the name of a table and a ByteArrayInputStream, construct a COPY IN query,
>      *  send the query ( while bypassing QueryExecutor), send the bytes of data and send the
>      *  3-byte sequence that signifies the end of the copy. We enclose this in a transaction to ensure
>      *  that the entire stream is copied in, or nothing at all.
>      * @param table the table to which it will copy
>      * @param in InputStream from which the copy will be read
>      * @return void
>      * @exception Exception if a database access error occurs
>      */
>
>     public void copyIn (String table, InputStream in) throws Exception
>     {
>         int response;
>         String str;
>
>         synchronized(pg_stream) {
>
>             // BEGIN the COPY in transaction
>
>             pg_stream.SendChar('Q');
>             pg_stream.Send(this.getEncoding().encode( "BEGIN" ));
>             pg_stream.SendChar(0);
>             pg_stream.flush();
>
>             // the backend shoul send the string "CBEGIN" - 'C' for "Completed" and 'BEGIN' for the type of completed
query
>             str = pg_stream.ReceiveString(this.getEncoding());
>             if ( ! str.equals("CBEGIN") ) throw new SQLException("BEGIN seemed to fail before COPY in. Received: '" +
str+ "'"); 
>             response = pg_stream.ReceiveChar();
>             if ( response != 'Z' ) throw new SQLException("Backend is not ready for next query. Instead of 'Z',
received:'" + (char)response + "'"); 
>
>
>             // now "inside" a transaction and ready for COPYing in
>
>             String query = "COPY " + table + " FROM STDIN";
>             //if (Driver.logDebug) Driver.debug("Sending query '" + query + "' to backend.");
>
>             // duplicates statements in QueryExecutor.sendQuery
>             pg_stream.SendChar('Q');
>             pg_stream.Send(this.getEncoding().encode( query ));
>             pg_stream.SendChar(0);
>             pg_stream.flush();
>
>             // check response from backend
>             response = pg_stream.ReceiveChar();
>
>             if (response == 'G') {
>                 if (Driver.logDebug) Driver.debug("Received 'G' from backend. The COPY in should now begin.");
>             } else if ( response == 'E') {
>                 String error_string = pg_stream.ReceiveString(this.getEncoding());
>                 throw new SQLException( error_string );
>             } else {
>                 throw new SQLException("Copy should receive G from backend, but instead received: " + (char)response
);
>             }
>
>
>             // send the whole input stream
>
>             int b; // a byte placeholder to read from in and send to backend
>
>             while (true) {
>
>                 try {
>                     b = in.read();
>                 } catch (IOException e) {
>                     // maybe the connection is screwed, or maybe we can salvage it. I don't know.
>                     throw new SQLException("While reading from InputStream, it threw exception: '" + e + "'");
>                 }
>
>                 // we may want to check for the termination string '\\.\n' because we send it later. It wouldn't be
goodto send it twice 
>
>                 if (b == -1) {
>                     break; // the InputStream is finished
>                 } else {
>                     try {
>                         pg_stream.SendChar((char)b);
>                     } catch (IOException e) {
>                         throw new SQLException("While sending a char to the backend, it threw exception: '" + e + "'"
); 
>                     }
>                 }
>             }
>
>             // send the special row
>             if (Driver.logDebug) Driver.debug("Sending the byte seqence '\\.'. The frontend has finished the COPY in
stream.");
>             pg_stream.Send( new byte[] { (byte)'\\', (byte)'.', (byte)'\n' } );
>             pg_stream.flush();
>
>             // the backend send the string "CCOPY" - 'C' for "Completed" and 'COPY' for the type of completed query
>             str = pg_stream.ReceiveString(this.getEncoding());
>             //if (Driver.logDebug) Driver.debug( "Should be CCOPY: " + str);
>
>             // check to make sure the backend is ready for the next query
>             response = pg_stream.ReceiveChar();
>
>             if (response == 'Z') {
>                 if (Driver.logDebug) Driver.debug("Received 'Z' from backend. It's ready for the next query. COPY in
hascompleted"); 
>             } else if ( response == 'E') {
>                 String error_string = pg_stream.ReceiveString(this.getEncoding());
>                 throw new SQLException( error_string );
>             } else {
>                 throw new SQLException("Copy should receive Z from backend, but instead received: " + (char)response
);
>             }
>
>
>             // since we reached this point with an error, COMMIT the COPY in transaction
>
>             pg_stream.SendChar('Q');
>             pg_stream.Send(this.getEncoding().encode( "COMMIT" ));
>             pg_stream.SendChar(0);
>             pg_stream.flush();
>
>             // the backend shoul send the string "CCOMMIT" - 'C' for "Completed" and 'COMMIT' for the type of
completedquery 
>             str = pg_stream.ReceiveString(this.getEncoding());
>             if ( ! str.equals("CCOMMIT") ) throw new SQLException("COMMIT seemed to fail after COPY in. Received: '"
+str + "'"); 
>             response = pg_stream.ReceiveChar();
>             if ( response != 'Z' ) throw new SQLException("Backend is not ready for next query. Instead of 'Z',
received:'" + (char)response + "'"); 
>         }
>     }
>
>
>
>
>
>
>
>
>
>
>
>     // *****************
>     // Postgres COPY testing from org.postgresql.test.JDBCTests
>     // *****************
>
>
>
>         /*
>          * Tests the copyOut functionality by copying data from pg_class into a test table,
>          * then it copies that data out and check the number of 'rows' that came out.
>          */
>         public void testCopyOut()
>         {
>                 try
>                 {
>                     org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB();
>
>                     java.sql.Statement st = conn.createStatement();
>                     st.executeUpdate( "INSERT into copy_out_test SELECT relowner, relpages FROM pg_class LIMIT 20" );
>
>                     ByteArrayOutputStream out = new ByteArrayOutputStream();
>                     java.sql.ResultSet rs = st.executeQuery( "SELECT count(*) AS row_count FROM copy_out_test" );
>                     rs.first();
>                     int row_count = rs.getInt("row_count"); // the number of rows in the table copy_out_test
>
>                     conn.copyOut("copy_out_test",out);
>
>                     ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
>                     int newline_count = 0;
>                     for ( int i = in.read(); i != -1; i = in.read() ) {
>                         if (i == '\n') newline_count++; // count the number of newlines
>                     }
>
>                     // # of newlines should equal original LIMIT #
>                     assertEquals(row_count,newline_count);
>                 }
>                 catch (Exception ex)
>                     {
>                         assertTrue(ex.getMessage(), false);
>                     }
>         }
>
>
>
>         /*
>          * Tests the copyIn functionality by copying in an array of bytes, then checking their integrity in the table
>          */
>         public void testCopyIn()
>         {
>                 try
>                 {
>                     org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB();
>                     java.sql.Statement st = conn.createStatement();
>                     byte[] dummy_array = new byte[] {
(byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n',  //Tom 12 
>
(byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n',  //Bob 29 
>
(byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n'   //Han 41 
>                     };
>
>                     InputStream in = new ByteArrayInputStream(dummy_array);
>                     conn.copyIn("copy_in_test",in);
>
>                     java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two) AS sum FROM copy_in_test" );
>                     rs.first();
>                     int sum = rs.getInt("sum");
>                     assertEquals(sum,82); // the sum of column 'two' should equal 82 (i.e. 12 + 29 + 41)
>
>                 }
>                 catch (Exception ex)
>                     {
>                         assertTrue(ex.getMessage(), false);
>                     }
>         }
>
>
>
>
>
>         /*
>          * Test a sequence of copy commands by
>          *
>          */
>         public void testCopyOutCopyIn()
>         {
>                 try
>                 {
>                     org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB();
>                     java.sql.Statement st = conn.createStatement();
>
>                     st.executeUpdate( "INSERT into copy_out_copy_in_test SELECT relowner, relpages FROM pg_class
LIMIT20" ); 
>
>                     ByteArrayOutputStream out = new ByteArrayOutputStream();
>                     java.sql.ResultSet rs = st.executeQuery( "SELECT count(*) AS row_count FROM
copy_out_copy_in_test"); 
>                     rs.first();
>                     int row_count = rs.getInt("row_count"); // the number of rows in the table copy_out_copy_in_test
>
>                     conn.copyOut("copy_out_copy_in_test",out);
>
>                     ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
>                     int newline_count = 0;
>                     for ( int i = in.read(); i != -1; i = in.read() ) {
>                         if (i == '\n') newline_count++; // count the number of newlines
>                     }
>
>                     // # of newlines should equal original LIMIT #
>                     assertEquals(row_count,newline_count);
>
>                     in = new ByteArrayInputStream(out.toByteArray());
>                     conn.copyIn("copy_out_copy_in_test",in);
>
>                     rs = st.executeQuery( "SELECT count(*) AS row_count FROM copy_out_copy_in_test" );
>                     rs.first();
>                     row_count = rs.getInt("row_count"); // the number of rows in the table copy_out_copy_in_test
>
>                     assertEquals(row_count , newline_count*2);
>                 }
>                 catch (Exception ex)
>                     {
>                         assertTrue(ex.getMessage(), false);
>                     }
>         }
>
>
>
>
>         /*
>          * Test a sequence of copy commands by copying in an array of bytes, then copying them out
>          * and comparing the lengths of the two arrays.
>          */
>         public void testCopyInCopyOut()
>         {
>                 try
>                 {
>                     org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB();
>
>                     java.sql.Statement st = conn.createStatement();
>                     byte[] input_array = new byte[] {
(byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n',
>
(byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n',
>
(byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n'
>                     };
>
>                     InputStream in = new ByteArrayInputStream(input_array);
>                     conn.copyIn("copy_in_copy_out_test",in);
>
>                     java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two) AS sum FROM copy_in_copy_out_test" );
>                     rs.first();
>                     int sum = rs.getInt("sum");
>                     assertEquals(sum,82); // the sum of column 'two' should equal 82 (i.e. 12 + 29 + 41)
>
>                     // copy out the everything from the table we just copy'ed into
>
>                     ByteArrayOutputStream out = new ByteArrayOutputStream();
>                     conn.copyOut("copy_in_copy_out_test",out);
>
>                     int  input_length = input_array.length;
>                     int output_length = out.size();
>                     assertEquals(input_length, output_length); // the input and output array should be the same size
>                 }
>                 catch (Exception ex)
>                     {
>                         assertTrue(ex.getMessage(), false);
>                     }
>         }
>
>
>
>
>
>
>
>
>
>
>
>
>     // *****************
>     // test using pipes so that you don't have to read everything into memory before you sent it out.
>     // *****************
>
>
>
> final org.postgresql.Connection local_con  =
(org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test1?loglevel=2","test" ,
"password");
> final org.postgresql.Connection local_con  =
(org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test2?loglevel=2","test" ,
"password");
>
>
>         final PipedOutputStream pout = new PipedOutputStream();
>         final PipedInputStream  pin  = new PipedInputStream(pout);
>
>         final Thread copierOut = new Thread() {
>                 public void run() {
>                     try {
>                         remote_con.copyOut("some_source_table",pout);
>                         pout.close(); // if you don't close it, the pipe will be considered "broken" when the Thread
ends
>                     } catch (Exception e) {
>                         System.err.println("Caught error while copying out " + e);
>                     }
>                 }
>             };
>
>
>         final Thread copierIn = new Thread() {
>                 public void run() {
>                     try {
>                         local_con.copyIn("some_destination_table",pin);
>                     } catch (Exception e) {
>                         System.err.println("Caught error while copying in " + e);
>                     }
>                 }
>             };
>
>         copierOut.start();
>         copierIn.start();
>
>
>
>
>
>
> ---------------------------(end of broadcast)---------------------------
> TIP 1: subscribe and unsubscribe commands go to majordomo@postgresql.org
>
>
>





pgsql-jdbc by date:

Previous
From: Michael Adler
Date:
Subject: code to perform COPY
Next
From: Barry Lind
Date:
Subject: Re: JDBC parse error