Index: build.xml =================================================================== RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/build.xml,v retrieving revision 1.47 diff -u -r1.47 build.xml --- build.xml 8 Jun 2004 00:01:47 -0000 1.47 +++ build.xml 24 Jun 2004 09:26:26 -0000 @@ -119,10 +119,11 @@ - + + Index: org/postgresql/PGConnection.java =================================================================== RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/PGConnection.java,v retrieving revision 1.8 diff -u -r1.8 PGConnection.java --- org/postgresql/PGConnection.java 8 Jun 2004 00:01:47 -0000 1.8 +++ org/postgresql/PGConnection.java 24 Jun 2004 09:26:26 -0000 @@ -17,6 +17,7 @@ import java.sql.*; import org.postgresql.core.Encoding; +import org.postgresql.copy.CopyManager; import org.postgresql.fastpath.Fastpath; import org.postgresql.largeobject.LargeObjectManager; @@ -30,6 +31,12 @@ */ public PGNotification[] getNotifications(); + /** + * This returns the COPY API for the current connection. + * @since 7.5 + */ + public CopyManager getCopyAPI() throws SQLException; + /** * This returns the LargeObject API for the current connection. * @since 7.3 Index: org/postgresql/errors.properties =================================================================== RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/errors.properties,v retrieving revision 1.34 diff -u -r1.34 errors.properties --- org/postgresql/errors.properties 21 Jun 2004 03:09:44 -0000 1.34 +++ org/postgresql/errors.properties 24 Jun 2004 09:26:26 -0000 @@ -31,6 +31,10 @@ postgresql.con.tuple:Tuple received before MetaData. postgresql.con.type:Unknown Response Type {0} postgresql.con.user:The user property is missing. It is mandatory. +postgresql.copy.ioerror:An IO error occurred while sending to the backend during COPY - {0} +postgresql.copy.inputsource:An IO error occured while reading from a COPY input source - {0} +postgresql.copy.outputsource:An IO error occured while writing to a COPY output source - {0} +postgresql.copy.type:Copy unexpected response type {0} postgresql.error.exception:Exception: {0} postgresql.error.stacktrace:Stack Trace: postgresql.error.stacktraceend:End of Stack Trace Index: org/postgresql/jdbc1/AbstractJdbc1Connection.java =================================================================== RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java,v retrieving revision 1.39 diff -u -r1.39 AbstractJdbc1Connection.java --- org/postgresql/jdbc1/AbstractJdbc1Connection.java 22 Jun 2004 09:36:32 -0000 1.39 +++ org/postgresql/jdbc1/AbstractJdbc1Connection.java 24 Jun 2004 09:26:27 -0000 @@ -29,6 +29,7 @@ import org.postgresql.core.PGStream; import org.postgresql.core.QueryExecutor; import org.postgresql.core.StartupPacket; +import org.postgresql.copy.CopyManager; import org.postgresql.fastpath.Fastpath; import org.postgresql.largeobject.LargeObjectManager; import org.postgresql.util.MD5Digest; @@ -993,6 +994,17 @@ // This holds a reference to the LargeObject API if already open private LargeObjectManager largeobject = null; + // The copy Manager + private CopyManager copyManager = null; + public CopyManager getCopyAPI() throws SQLException + { + if (copyManager == null && getPGProtocolVersionMajor() >= 3) + copyManager = new CopyManager(this,pgStream); + return copyManager; + } + + + /* * This method is used internally to return an object based around * org.postgresql's more unique data types. Index: org/postgresql/test/jdbc2/Jdbc2TestSuite.java =================================================================== RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/test/jdbc2/Jdbc2TestSuite.java,v retrieving revision 1.13 diff -u -r1.13 Jdbc2TestSuite.java --- org/postgresql/test/jdbc2/Jdbc2TestSuite.java 29 Mar 2004 19:17:12 -0000 1.13 +++ org/postgresql/test/jdbc2/Jdbc2TestSuite.java 24 Jun 2004 09:26:27 -0000 @@ -62,7 +62,8 @@ // Fastpath/LargeObject suite.addTestSuite(BlobTest.class); suite.addTestSuite(OID74Test.class); - + suite.addTestSuite(CopyTest.class); + suite.addTestSuite(UpdateableResultTest.class ); suite.addTestSuite(CallableStmtTest.class ); Index: build.local.properties =================================================================== RCS file: build.local.properties diff -N build.local.properties --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ build.local.properties 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,11 @@ +# Default build parameters. These may be overridden by local configuration +# settings in build.local.properties. +# + +fullversion=7.5develschabi +server=localhost +port=5432 +database=jdbcdrivertest +username=jdbcuser +password=jdbc +preparethreshold=42 Index: org/postgresql/copy/CopyManager.java =================================================================== RCS file: org/postgresql/copy/CopyManager.java diff -N org/postgresql/copy/CopyManager.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ org/postgresql/copy/CopyManager.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,238 @@ +package org.postgresql.copy; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; + +import java.sql.SQLException; + +import org.postgresql.core.BaseConnection; +import org.postgresql.core.PGStream; +import org.postgresql.core.Encoding; +import org.postgresql.core.Notification; +import org.postgresql.util.PSQLState; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLWarning; +import org.postgresql.util.ServerErrorMessage; + +/** + * Implement COPY support in the JDBC driver. This requires a 7.4 server and a + * connection with the V3 protocol. Previous versions could not recover from + * errors and the connection had to be abandoned which was not acceptable. + */ + +public class CopyManager { + private BaseConnection pgConn; + private PGStream pgStream; + + public CopyManager(BaseConnection pgConn, PGStream pgStream) { + this.pgConn = pgConn; + this.pgStream = pgStream; + } + + /** + * Copy data from the InputStream into the given table using the default + * copy parameters. + */ + public void copyIn(String table, InputStream is) throws SQLException { + copyInQuery("COPY " + table + " FROM STDIN", is); + } + + /** + * Copy data from the InputStream using the given COPY query. This allows + * specification of additional copy parameters such as the delimiter or NULL + * marker. + */ + public void copyInQuery(String query, InputStream is) throws SQLException { + + synchronized (pgStream) { + sendQuery(query); + copyResultLoop(is, null); + } + + } + + /** + * Copy data from the given table to the OutputStream using the default copy + * parameters. + */ + public void copyOut(String table, OutputStream os) throws SQLException { + copyOutQuery("COPY " + table + " TO STDOUT", os); + } + + /** + * Copy data to the OutputStream using the given COPY query. This allows + * specification of additional copy parameters such as the delimiter or NULL + * marker. + */ + public void copyOutQuery(String query, OutputStream os) throws SQLException { + synchronized (pgStream) { + sendQuery(query); + copyResultLoop(null, os); + } + } + + /** + * After the copy query has been go through the possible responses. The flag + * which tells us whether we are doing copy in or out is simply where the + * InputStream or OutputStream is null. + * + * This is much like the loop in QueryExecutor, it could be merged into + * that, but it would require some generalization of its current specific + * tasks. Right now it has its query in m_binds[] form and expects to return + * a ResultSet. A more pluggable network layer would be nice so we could + * support the V2 and V3 protocols more cleanly and consider a SPI based + * layer for an in server pl/java. In general I think it's a bad idea for + * PGStream to be seen anywhere outside of the QueryExecutor. + */ + private void copyResultLoop(InputStream is, OutputStream os) throws SQLException { + + Encoding encoding = pgConn.getEncoding(); + + PSQLException topLevelError = null; + ServerErrorMessage sem; + boolean queryDone = false; + while (!queryDone) { + int c = pgStream.ReceiveChar(); + + switch (c) { + case 'A' :// Asynch Notify + int pid = pgStream.ReceiveIntegerR(4); + String msg = pgStream.ReceiveString(encoding); + pgConn.addNotification(new Notification(msg, pid)); + break; + case 'C' :// Command Complete + int commandLength = pgStream.ReceiveIntegerR(4); + String command = encoding.decode(pgStream.Receive(commandLength - 4 - 1)); + pgStream.Receive(1); + break; + case 'E' :// Error Message + int errorLength = pgStream.ReceiveIntegerR(4); + String errorMessage = encoding.decode(pgStream.Receive(errorLength - 4)); + sem = new ServerErrorMessage(errorMessage); + + PSQLException error = new PSQLException(sem.toString(), new PSQLState(sem.getSQLState())); + if (topLevelError != null) { + topLevelError.setNextException(error); + } else { + topLevelError = error; + } + break; + case 'N' :// Error Notification + int notificationLength = pgStream.ReceiveIntegerR(4); + String notificationMessage = encoding.decode(pgStream.Receive(notificationLength - 4)); + sem = new ServerErrorMessage(notificationMessage); + PSQLWarning warn = new PSQLWarning(sem); + pgConn.addWarning(warn); + break; + case 'G' :// CopyInResponse + if (is == null) + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character( + (char) c)); + receiveCopyInOutResponse(); + sendCopyData(is); + break; + case 'H' :// CopyOutResponse + if (os == null) + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character( + (char) c)); + receiveCopyInOutResponse(); + break; + case 'd' :// CopyData + if (os == null) + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character( + (char) c)); + receiveCopyData(os); + break; + case 'c' :// CopyDone + int copyDoneLength = pgStream.ReceiveIntegerR(4); + break; + case 'Z' :// ReadyForQuery + int messageLength = pgStream.ReceiveIntegerR(4); + char messageStatus = (char) pgStream.ReceiveChar(); + queryDone = true; + break; + default : + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c)); + } + } + + if (topLevelError != null) + throw topLevelError; + + } + + private void sendQuery(String query) throws SQLException { + Encoding encoding = pgConn.getEncoding(); + try { + pgStream.SendChar('Q'); + byte message[] = encoding.encode(query); + int messageSize = 4 + message.length + 1; + pgStream.SendInteger(messageSize, 4); + pgStream.Send(message); + pgStream.SendChar(0); + pgStream.flush(); + } catch (IOException ioe) { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + + private void sendCopyData(InputStream is) throws SQLException { + byte buf[] = new byte[8192]; + + int read = 0; + + while (read >= 0) { + try { + read = is.read(buf); + } catch (IOException ioe) { + throw new PSQLException("postgresql.copy.inputsource", PSQLState.DATA_ERROR, ioe); + } + + if (read > 0) { + try { + pgStream.SendChar('d'); + int messageSize = read + 4; + pgStream.SendInteger(messageSize, 4); + pgStream.Send(buf, read); + } catch (IOException ioe) { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, + ioe); + } + } + } + + // Send the CopyDone message + try { + pgStream.SendChar('c'); + pgStream.SendInteger(4, 4); + pgStream.flush(); + } catch (IOException ioe) { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + + /** + * CopyInResponse and CopyOutResponse have the same field layouts and we + * simply discard the results. + */ + private void receiveCopyInOutResponse() throws SQLException { + int messageLength = pgStream.ReceiveIntegerR(4); + int copyFormat = pgStream.ReceiveIntegerR(1); + + int numColumns = pgStream.ReceiveIntegerR(2); + for (int i = 0; i < numColumns; i++) { + int copyColumnFormat = pgStream.ReceiveIntegerR(2); + } + } + + private void receiveCopyData(OutputStream os) throws SQLException { + int messageLength = pgStream.ReceiveIntegerR(4); + byte data[] = pgStream.Receive(messageLength - 4); + try { + os.write(data); + } catch (IOException ioe) { + throw new PSQLException("postgresql.copy.outputsource", PSQLState.DATA_ERROR, ioe); + } + } +} Index: org/postgresql/test/jdbc2/CopyTest.java =================================================================== RCS file: org/postgresql/test/jdbc2/CopyTest.java diff -N org/postgresql/test/jdbc2/CopyTest.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ org/postgresql/test/jdbc2/CopyTest.java 1 Jan 1970 00:00:00 -0000 @@ -0,0 +1,109 @@ +package org.postgresql.test.jdbc2; + +import junit.framework.TestCase; +import org.postgresql.test.TestUtil; +import org.postgresql.copy.CopyManager; +import java.io.*; +import java.sql.*; + +public class CopyTest extends TestCase { + private Connection conn; + private CopyManager copyManager; + + protected void setUp() throws SQLException { + conn = TestUtil.openDB(); + TestUtil.createTable(conn, "copytesttable", "a int, b text, c float, d text"); + copyManager = ((org.postgresql.PGConnection) conn).getCopyAPI(); + } + + protected void tearDown() throws SQLException { + TestUtil.dropTable(conn, "copytesttable"); + TestUtil.closeDB(conn); + } + + private byte[] getData() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + ps.println("35\tSome words go here\t3.14159\tz\\na"); + ps.println("44\tMore text\t2.71828\th"); + return baos.toByteArray(); + } + + public void testCopyIn() throws SQLException { + if (((org.postgresql.core.BaseConnection) conn).getPGProtocolVersionMajor() < 3) + return; + InputStream is = new ByteArrayInputStream(getData()); + copyManager.copyIn("copytesttable", is); + + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT a,b,c,d FROM copytesttable"); + int rowcount = 0; + while (rs.next()) { + if (rowcount == 0) { + assertEquals(rs.getInt(1), 35); + assertEquals(rs.getString(2), "Some words go here"); + assertEquals(rs.getDouble(3), 3.14159, 0.00001); + assertEquals(rs.getString(4), "z\na"); + } else if (rowcount == 1) { + assertEquals(rs.getInt(1), 44); + assertEquals(rs.getString(2), "More text"); + assertEquals(rs.getDouble(3), 2.71828, 0.00001); + assertEquals(rs.getString(4), "h"); + } else { + fail("Too many rows returned."); + } + rowcount++; + } + assertEquals(rowcount, 2); + + rs.close(); + stmt.close(); + } + + public void testCopyOut() throws SQLException { + if (((org.postgresql.core.BaseConnection) conn).getPGProtocolVersionMajor() < 3) + return; + PreparedStatement pstmt = conn.prepareStatement("INSERT INTO copytesttable(a,b,c,d) VALUES (?,?,?,?)"); + + pstmt.setInt(1, 35); + pstmt.setString(2, "Some words go here"); + pstmt.setDouble(3, 3.14159); + pstmt.setString(4, "z\na"); + pstmt.executeUpdate(); + + pstmt.setInt(1, 44); + pstmt.setString(2, "More text"); + pstmt.setDouble(3, 2.71828); + pstmt.setString(4, "h"); + pstmt.executeUpdate(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + copyManager.copyOut("copytesttable", baos); + + byte orig[] = getData(); + byte server[] = baos.toByteArray(); + + assertEquals(orig.length, server.length); + for (int i = 0; i < orig.length; i++) { + assertEquals(orig[i], server[i]); + } + + pstmt.close(); + } + + public void testCopyInOut() throws SQLException { + if (((org.postgresql.core.BaseConnection) conn).getPGProtocolVersionMajor() < 3) + return; + byte orig[] = getData(); + copyManager.copyIn("copytesttable", new ByteArrayInputStream(orig)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + copyManager.copyOut("copytesttable", baos); + byte server[] = baos.toByteArray(); + + assertEquals(orig.length, server.length); + for (int i = 0; i < orig.length; i++) { + assertEquals(orig[i], server[i]); + } + } +}