diff -rcN -xCVS src/interfaces/jdbc/build.xml src/interfaces/myjdbc/build.xml *** src/interfaces/jdbc/build.xml Sat Nov 29 11:52:09 2003 --- src/interfaces/myjdbc/build.xml Mon Dec 29 13:56:51 2003 *************** *** 117,122 **** --- 117,123 ---- + diff -rcN -xCVS src/interfaces/jdbc/org/postgresql/PGConnection.java src/interfaces/myjdbc/org/postgresql/PGConnection.java *** src/interfaces/jdbc/org/postgresql/PGConnection.java Sat Nov 29 11:52:09 2003 --- src/interfaces/myjdbc/org/postgresql/PGConnection.java Tue Dec 30 00:08:20 2003 *************** *** 17,22 **** --- 17,23 ---- import java.sql.*; import org.postgresql.core.Encoding; + import org.postgresql.copy.CopyManager; import org.postgresql.fastpath.Fastpath; import org.postgresql.largeobject.LargeObjectManager; *************** *** 31,36 **** --- 32,43 ---- public PGNotification[] getNotifications(); /** + * This returns the COPY API for the current connection. + * @since 7.5 + */ + public CopyManager getCopyAPI() throws SQLException; + + /** * This returns the LargeObject API for the current connection. * @since 7.3 */ diff -rcN -xCVS src/interfaces/jdbc/org/postgresql/copy/CopyManager.java src/interfaces/myjdbc/org/postgresql/copy/CopyManager.java *** src/interfaces/jdbc/org/postgresql/copy/CopyManager.java Wed Dec 31 16:00:00 1969 --- src/interfaces/myjdbc/org/postgresql/copy/CopyManager.java Tue Dec 30 00:07:04 2003 *************** *** 0 **** --- 1,250 ---- + package org.postgresql.copy; + + import java.io.InputStream; + import java.io.OutputStream; + import java.io.IOException; + + import java.sql.SQLException; + + import org.postgresql.core.BaseConnection; + import org.postgresql.core.PGStream; + import org.postgresql.core.Encoding; + import org.postgresql.core.Notification; + import org.postgresql.util.PSQLState; + import org.postgresql.util.PSQLException; + + /** + * Implement COPY support in the JDBC driver. This requires a 7.4 server and + * a connection with the V3 protocol. Previous versions could not recover + * from errors and the connection had to be abandoned which was not acceptable. + */ + + public class CopyManager + { + private BaseConnection pgConn; + private PGStream pgStream; + + public CopyManager(BaseConnection pgConn, PGStream pgStream) + { + this.pgConn = pgConn; + this.pgStream = pgStream; + } + + /** + * Copy data from the InputStream into the given table using the + * default copy parameters. + */ + public void copyIn(String table, InputStream is) throws SQLException + { + copyInQuery("COPY " + table + " FROM STDIN", is); + } + + /** + * Copy data from the InputStream using the given COPY query. This + * allows specification of additional copy parameters such as the + * delimiter or NULL marker. + */ + public void copyInQuery(String query, InputStream is) throws SQLException + { + + synchronized(pgStream) { + sendQuery(query); + copyResultLoop(is,null); + } + + } + + /** + * Copy data from the given table to the OutputStream using the + * default copy parameters. + */ + public void copyOut(String table, OutputStream os) throws SQLException + { + copyOutQuery("COPY " + table + " TO STDOUT", os); + } + + /** + * Copy data to the OutputStream using the given COPY query. This + * allows specification of additional copy parameters such as the + * delimiter or NULL marker. + */ + public void copyOutQuery(String query, OutputStream os) throws SQLException + { + synchronized(pgStream) { + sendQuery(query); + copyResultLoop(null,os); + } + } + + /** + * After the copy query has been go through the possible responses. + * The flag which tells us whether we are doing copy in or out is + * simply where the InputStream or OutputStream is null. + * + * This is much like the loop in QueryExecutor, it could be merged + * into that, but it would require some generalization of its + * current specific tasks. Right now it has its query in m_binds[] + * form and expects to return a ResultSet. A more pluggable network + * layer would be nice so we could support the V2 and V3 protocols + * more cleanly and consider a SPI based layer for an in server + * pl/java. In general I think it's a bad idea for PGStream to + * be seen anywhere outside of the QueryExecutor. + */ + private void copyResultLoop(InputStream is, OutputStream os) throws SQLException + { + + Encoding encoding = pgConn.getEncoding(); + + PSQLException topLevelError = null; + boolean queryDone = false; + while (!queryDone) + { + int c = pgStream.ReceiveChar(); + switch (c) + { + case 'A': // Asynch Notify + int pid = pgStream.ReceiveInteger(4); + String msg = pgStream.ReceiveString(encoding); + pgConn.addNotification(new Notification(msg, pid)); + break; + case 'C': // Command Complete + int commandLength = pgStream.ReceiveIntegerR(4); + String command = encoding.decode(pgStream.Receive(commandLength - 4 - 1)); + pgStream.Receive(1); + break; + case 'E': // Error Message + int errorLength = pgStream.ReceiveIntegerR(4); + String errorMessage = encoding.decode(pgStream.Receive(errorLength-4)); + PSQLException error = PSQLException.parseServerError(errorMessage); + if (topLevelError != null) { + topLevelError.setNextException(error); + } else { + topLevelError = error; + } + break; + case 'N': // Error Notification + int notificationLength = pgStream.ReceiveIntegerR(4); + String notificationMessage = encoding.decode(pgStream.Receive(notificationLength-4)); + pgConn.addWarning(notificationMessage); + break; + case 'G': // CopyInResponse + if (is == null) + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c)); + receiveCopyInOutResponse(); + sendCopyData(is); + break; + case 'H': // CopyOutResponse + if (os == null) + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c)); + receiveCopyInOutResponse(); + break; + case 'd': // CopyData + if (os == null) + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c)); + receiveCopyData(os); + break; + case 'c': // CopyDone + int copyDoneLength = pgStream.ReceiveIntegerR(4); + break; + case 'Z': // ReadyForQuery + int messageLength = pgStream.ReceiveIntegerR(4); + char messageStatus = (char)pgStream.ReceiveChar(); + queryDone = true; + break; + default: + throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c)); + } + } + + if (topLevelError != null) + throw topLevelError; + + } + + private void sendQuery(String query) throws SQLException + { + Encoding encoding = pgConn.getEncoding(); + try { + pgStream.SendChar('Q'); + byte message[] = encoding.encode(query); + int messageSize = 4 + message.length + 1; + pgStream.SendInteger(messageSize,4); + pgStream.Send(message); + pgStream.SendChar(0); + pgStream.flush(); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + + private void sendCopyData(InputStream is) throws SQLException + { + byte buf[] = new byte[8192]; + + int read = 0; + + while (read >= 0) { + try { + read = is.read(buf); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.inputsource", PSQLState.DATA_ERROR, ioe); + } + + if (read > 0) { + try { + pgStream.SendChar('d'); + int messageSize = read+4; + pgStream.SendInteger(messageSize,4); + pgStream.Send(buf, read); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + } + + // Send the CopyDone message + try { + pgStream.SendChar('c'); + pgStream.SendInteger(4,4); + pgStream.flush(); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + + /** + * CopyInResponse and CopyOutResponse have the same field + * layouts and we simply discard the results. + */ + private void receiveCopyInOutResponse() throws SQLException + { + int messageLength = pgStream.ReceiveIntegerR(4); + int copyFormat = pgStream.ReceiveIntegerR(1); + + int numColumns = pgStream.ReceiveIntegerR(2); + for (int i=0; i= 3) + copyManager = new CopyManager(this,pgStream); + return copyManager; + } + /* * This method is used internally to return an object based around * org.postgresql's more unique data types. diff -rcN -xCVS src/interfaces/jdbc/org/postgresql/test/jdbc2/CopyTest.java src/interfaces/myjdbc/org/postgresql/test/jdbc2/CopyTest.java *** src/interfaces/jdbc/org/postgresql/test/jdbc2/CopyTest.java Wed Dec 31 16:00:00 1969 --- src/interfaces/myjdbc/org/postgresql/test/jdbc2/CopyTest.java Mon Dec 29 20:45:19 2003 *************** *** 0 **** --- 1,118 ---- + package org.postgresql.test.jdbc2; + + import junit.framework.TestCase; + import org.postgresql.test.TestUtil; + import org.postgresql.copy.CopyManager; + import java.io.*; + import java.sql.*; + + public class CopyTest extends TestCase + { + private Connection conn; + private CopyManager copyManager; + + protected void setUp() throws SQLException + { + conn = TestUtil.openDB(); + TestUtil.createTable(conn, "copytesttable", "a int, b text, c float, d text"); + copyManager = ((org.postgresql.PGConnection)conn).getCopyAPI(); + } + + protected void tearDown() throws SQLException + { + TestUtil.dropTable(conn, "copytesttable"); + TestUtil.closeDB(conn); + } + + private byte[] getData() + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + ps.println("35\tSome words go here\t3.14159\tz\\na"); + ps.println("44\tMore text\t2.71828\th"); + return baos.toByteArray(); + } + + + public void testCopyIn() throws SQLException + { + if (((org.postgresql.core.BaseConnection)conn).getPGProtocolVersionMajor() < 3) + return; + InputStream is = new ByteArrayInputStream(getData()); + copyManager.copyIn("copytesttable", is); + + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT a,b,c,d FROM copytesttable"); + int rowcount = 0; + while (rs.next()) { + if (rowcount == 0) { + assertEquals(rs.getInt(1), 35); + assertEquals(rs.getString(2), "Some words go here"); + assertEquals(rs.getDouble(3), 3.14159, 0.00001); + assertEquals(rs.getString(4), "z\na"); + } else if (rowcount == 1) { + assertEquals(rs.getInt(1), 44); + assertEquals(rs.getString(2), "More text"); + assertEquals(rs.getDouble(3), 2.71828, 0.00001); + assertEquals(rs.getString(4), "h"); + } else { + fail("Too many rows returned."); + } + rowcount++; + } + assertEquals(rowcount, 2); + + rs.close(); + stmt.close(); + + } + + public void testCopyOut() throws SQLException + { + if (((org.postgresql.core.BaseConnection)conn).getPGProtocolVersionMajor() < 3) + return; + PreparedStatement pstmt = conn.prepareStatement("INSERT INTO copytesttable(a,b,c,d) VALUES (?,?,?,?)"); + + pstmt.setInt(1,35); + pstmt.setString(2,"Some words go here"); + pstmt.setDouble(3,3.14159); + pstmt.setString(4,"z\na"); + pstmt.executeUpdate(); + + pstmt.setInt(1,44); + pstmt.setString(2,"More text"); + pstmt.setDouble(3,2.71828); + pstmt.setString(4,"h"); + pstmt.executeUpdate(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + copyManager.copyOut("copytesttable", baos); + + byte orig[] = getData(); + byte server[] = baos.toByteArray(); + + assertEquals(orig.length,server.length); + for (int i=0; i