code to perform COPY - Mailing list pgsql-jdbc
From | Michael Adler |
---|---|
Subject | code to perform COPY |
Date | |
Msg-id | Pine.NEB.4.44.0206281617450.10330-100000@reva.sixgirls.org Whole thread Raw |
In response to | Re: Problem with JDBC: no suitable driver (Dave Cramer <Dave@micro-automation.net>) |
Responses |
Re: code to perform COPY
|
List | pgsql-jdbc |
Here's the goods. This has worked pretty well for me. I think it's ready for the next stage of scrutiny. We may want to take a slightly different approach and move some of this logic to PG_Stream. There's alot of code overhead on sending down the transaction-control queries (BEGIN and COMMIT) and reading the response analyses. I may not be taking advantage of some method, or we may want to add functionality elsewhere to support copy in a clean fashion. The debug messages are very verbose, so we'll probably want to comment out some of that. For large copies, it prints out a crude KB per second. I get 220KB/sec over my LAN. It appears to be CPU bound on my P4 1.6ghz, so perhaps that can be improved to the point of being network-bound. Like I mentioned in my previous email, since PG_Stream.input_stream is an java.io.InputStream, the multi-byte read/receive method offers no improvments over the single-byte method. I could very well be wrong, though. I still haven't tested it over a high-latency connection. // ***************** // Postgres COPY handling from org.postgresql.Connection // ***************** /** * This will take the name of a table and an OutputStream, construct a COPY OUT query, send the query * ( while bypassing QueryExecutor), direct the output of the backend into the OutputStream and * then check the backend to see if it's ready for another query. * @param table the table from which it will copy * @param out OutputStream to which the copy will be sent * @return void * @exception Exception if a database access error occurs * **/ public void copyOut(String table, OutputStream out) throws Exception { synchronized(pg_stream) { String query = "COPY " + table + " TO STDOUT"; if (Driver.logDebug) Driver.debug("Sending query '" + query + "' to backend."); // duplicates statements in QueryExecutor.sendQuery pg_stream.SendChar('Q'); pg_stream.Send(this.getEncoding().encode( query )); pg_stream.SendChar(0); pg_stream.flush(); // check response from backend int response = pg_stream.ReceiveChar(); if (response == 'H') { if (Driver.logDebug) Driver.debug("Received 'H' from backend. The COPY out is beginning."); } else if ( response == 'E') { String error_string = pg_stream.ReceiveString(this.getEncoding()); throw new SQLException( error_string ); } else { throw new SQLException("Copy Out should receive H from backend, but instead received: " + (char)response); } long start_time = System.currentTimeMillis(); int a, b, c; // read input stream one char at a time, but always hold three a = pg_stream.ReceiveChar(); b = pg_stream.ReceiveChar(); c = pg_stream.ReceiveChar(); int counter = 3; while (true) { if ( Driver.logDebug && counter % 100000 == 0) { int rate = counter / (int)(System.currentTimeMillis() - start_time); System.out.println( (counter / 1000) + " KB total at " + rate + " KB/sec"); } if ( a == '\\' && b == '.' && c == '\n' ) { // this sequence of bytes means the copy is over if (Driver.logDebug) Driver.debug("Received '\\.' from backend. The COPY out stream is finished."); break; } out.write(a); a = b; b = c; try { // the following looks unoptimized, but is really the same as java.io.OutputStream.read(byte[] b, intoff, int len) c = pg_stream.ReceiveChar(); } catch (Exception e) { // maybe the connection is screwed, or maybe we can salvage it. I don't know. throw e; } counter++; } // the backend should send the string "COPY" String str = pg_stream.ReceiveString(this.getEncoding()); // check to make sure the backend is ready for the next query response = pg_stream.ReceiveChar(); if (response == 'Z') { if (Driver.logDebug) Driver.debug("Received 'Z' from backend. It's ready for the next query. COPY out hascompleted"); } else if ( response == 'E') { String error_string = pg_stream.ReceiveString(this.getEncoding()); throw new SQLException( error_string ); } else { throw new SQLException("Copy should receive Z from backend, but instead received: " + (char)response ); } } return; } /* * This will take the name of a table and a ByteArrayInputStream, construct a COPY IN query, * send the query ( while bypassing QueryExecutor), send the bytes of data and send the * 3-byte sequence that signifies the end of the copy. We enclose this in a transaction to ensure * that the entire stream is copied in, or nothing at all. * @param table the table to which it will copy * @param in InputStream from which the copy will be read * @return void * @exception Exception if a database access error occurs */ public void copyIn (String table, InputStream in) throws Exception { int response; String str; synchronized(pg_stream) { // BEGIN the COPY in transaction pg_stream.SendChar('Q'); pg_stream.Send(this.getEncoding().encode( "BEGIN" )); pg_stream.SendChar(0); pg_stream.flush(); // the backend shoul send the string "CBEGIN" - 'C' for "Completed" and 'BEGIN' for the type of completed query str = pg_stream.ReceiveString(this.getEncoding()); if ( ! str.equals("CBEGIN") ) throw new SQLException("BEGIN seemed to fail before COPY in. Received: '" + str+ "'"); response = pg_stream.ReceiveChar(); if ( response != 'Z' ) throw new SQLException("Backend is not ready for next query. Instead of 'Z', received:'" + (char)response + "'"); // now "inside" a transaction and ready for COPYing in String query = "COPY " + table + " FROM STDIN"; //if (Driver.logDebug) Driver.debug("Sending query '" + query + "' to backend."); // duplicates statements in QueryExecutor.sendQuery pg_stream.SendChar('Q'); pg_stream.Send(this.getEncoding().encode( query )); pg_stream.SendChar(0); pg_stream.flush(); // check response from backend response = pg_stream.ReceiveChar(); if (response == 'G') { if (Driver.logDebug) Driver.debug("Received 'G' from backend. The COPY in should now begin."); } else if ( response == 'E') { String error_string = pg_stream.ReceiveString(this.getEncoding()); throw new SQLException( error_string ); } else { throw new SQLException("Copy should receive G from backend, but instead received: " + (char)response ); } // send the whole input stream int b; // a byte placeholder to read from in and send to backend while (true) { try { b = in.read(); } catch (IOException e) { // maybe the connection is screwed, or maybe we can salvage it. I don't know. throw new SQLException("While reading from InputStream, it threw exception: '" + e + "'"); } // we may want to check for the termination string '\\.\n' because we send it later. It wouldn't be goodto send it twice if (b == -1) { break; // the InputStream is finished } else { try { pg_stream.SendChar((char)b); } catch (IOException e) { throw new SQLException("While sending a char to the backend, it threw exception: '" + e + "'" ); } } } // send the special row if (Driver.logDebug) Driver.debug("Sending the byte seqence '\\.'. The frontend has finished the COPY in stream."); pg_stream.Send( new byte[] { (byte)'\\', (byte)'.', (byte)'\n' } ); pg_stream.flush(); // the backend send the string "CCOPY" - 'C' for "Completed" and 'COPY' for the type of completed query str = pg_stream.ReceiveString(this.getEncoding()); //if (Driver.logDebug) Driver.debug( "Should be CCOPY: " + str); // check to make sure the backend is ready for the next query response = pg_stream.ReceiveChar(); if (response == 'Z') { if (Driver.logDebug) Driver.debug("Received 'Z' from backend. It's ready for the next query. COPY in hascompleted"); } else if ( response == 'E') { String error_string = pg_stream.ReceiveString(this.getEncoding()); throw new SQLException( error_string ); } else { throw new SQLException("Copy should receive Z from backend, but instead received: " + (char)response ); } // since we reached this point with an error, COMMIT the COPY in transaction pg_stream.SendChar('Q'); pg_stream.Send(this.getEncoding().encode( "COMMIT" )); pg_stream.SendChar(0); pg_stream.flush(); // the backend shoul send the string "CCOMMIT" - 'C' for "Completed" and 'COMMIT' for the type of completed query str = pg_stream.ReceiveString(this.getEncoding()); if ( ! str.equals("CCOMMIT") ) throw new SQLException("COMMIT seemed to fail after COPY in. Received: '" + str+ "'"); response = pg_stream.ReceiveChar(); if ( response != 'Z' ) throw new SQLException("Backend is not ready for next query. Instead of 'Z', received:'" + (char)response + "'"); } } // ***************** // Postgres COPY testing from org.postgresql.test.JDBCTests // ***************** /* * Tests the copyOut functionality by copying data from pg_class into a test table, * then it copies that data out and check the number of 'rows' that came out. */ public void testCopyOut() { try { org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB(); java.sql.Statement st = conn.createStatement(); st.executeUpdate( "INSERT into copy_out_test SELECT relowner, relpages FROM pg_class LIMIT 20" ); ByteArrayOutputStream out = new ByteArrayOutputStream(); java.sql.ResultSet rs = st.executeQuery( "SELECT count(*) AS row_count FROM copy_out_test" ); rs.first(); int row_count = rs.getInt("row_count"); // the number of rows in the table copy_out_test conn.copyOut("copy_out_test",out); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); int newline_count = 0; for ( int i = in.read(); i != -1; i = in.read() ) { if (i == '\n') newline_count++; // count the number of newlines } // # of newlines should equal original LIMIT # assertEquals(row_count,newline_count); } catch (Exception ex) { assertTrue(ex.getMessage(), false); } } /* * Tests the copyIn functionality by copying in an array of bytes, then checking their integrity in the table */ public void testCopyIn() { try { org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB(); java.sql.Statement st = conn.createStatement(); byte[] dummy_array = new byte[] { (byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n', //Tom 12 (byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n', //Bob 29 (byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n' //Han 41 }; InputStream in = new ByteArrayInputStream(dummy_array); conn.copyIn("copy_in_test",in); java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two) AS sum FROM copy_in_test" ); rs.first(); int sum = rs.getInt("sum"); assertEquals(sum,82); // the sum of column 'two' should equal 82 (i.e. 12 + 29 + 41) } catch (Exception ex) { assertTrue(ex.getMessage(), false); } } /* * Test a sequence of copy commands by * */ public void testCopyOutCopyIn() { try { org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB(); java.sql.Statement st = conn.createStatement(); st.executeUpdate( "INSERT into copy_out_copy_in_test SELECT relowner, relpages FROM pg_class LIMIT 20"); ByteArrayOutputStream out = new ByteArrayOutputStream(); java.sql.ResultSet rs = st.executeQuery( "SELECT count(*) AS row_count FROM copy_out_copy_in_test" ); rs.first(); int row_count = rs.getInt("row_count"); // the number of rows in the table copy_out_copy_in_test conn.copyOut("copy_out_copy_in_test",out); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); int newline_count = 0; for ( int i = in.read(); i != -1; i = in.read() ) { if (i == '\n') newline_count++; // count the number of newlines } // # of newlines should equal original LIMIT # assertEquals(row_count,newline_count); in = new ByteArrayInputStream(out.toByteArray()); conn.copyIn("copy_out_copy_in_test",in); rs = st.executeQuery( "SELECT count(*) AS row_count FROM copy_out_copy_in_test" ); rs.first(); row_count = rs.getInt("row_count"); // the number of rows in the table copy_out_copy_in_test assertEquals(row_count , newline_count*2); } catch (Exception ex) { assertTrue(ex.getMessage(), false); } } /* * Test a sequence of copy commands by copying in an array of bytes, then copying them out * and comparing the lengths of the two arrays. */ public void testCopyInCopyOut() { try { org.postgresql.Connection conn = (org.postgresql.Connection)JDBC2Tests.openDB(); java.sql.Statement st = conn.createStatement(); byte[] input_array = new byte[] { (byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n', (byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n', (byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n' }; InputStream in = new ByteArrayInputStream(input_array); conn.copyIn("copy_in_copy_out_test",in); java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two) AS sum FROM copy_in_copy_out_test" ); rs.first(); int sum = rs.getInt("sum"); assertEquals(sum,82); // the sum of column 'two' should equal 82 (i.e. 12 + 29 + 41) // copy out the everything from the table we just copy'ed into ByteArrayOutputStream out = new ByteArrayOutputStream(); conn.copyOut("copy_in_copy_out_test",out); int input_length = input_array.length; int output_length = out.size(); assertEquals(input_length, output_length); // the input and output array should be the same size } catch (Exception ex) { assertTrue(ex.getMessage(), false); } } // ***************** // test using pipes so that you don't have to read everything into memory before you sent it out. // ***************** final org.postgresql.Connection local_con = (org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test1?loglevel=2","test" , "password"); final org.postgresql.Connection local_con = (org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test2?loglevel=2","test" , "password"); final PipedOutputStream pout = new PipedOutputStream(); final PipedInputStream pin = new PipedInputStream(pout); final Thread copierOut = new Thread() { public void run() { try { remote_con.copyOut("some_source_table",pout); pout.close(); // if you don't close it, the pipe will be considered "broken" when the Thread ends } catch (Exception e) { System.err.println("Caught error while copying out " + e); } } }; final Thread copierIn = new Thread() { public void run() { try { local_con.copyIn("some_destination_table",pin); } catch (Exception e) { System.err.println("Caught error while copying in " + e); } } }; copierOut.start(); copierIn.start();
pgsql-jdbc by date: