diff -rcN -xCVS src/interfaces/jdbc/build.xml src/interfaces/myjdbc/build.xml
*** src/interfaces/jdbc/build.xml Sat Nov 29 11:52:09 2003
--- src/interfaces/myjdbc/build.xml Mon Dec 29 13:56:51 2003
***************
*** 117,122 ****
--- 117,123 ----
+
diff -rcN -xCVS src/interfaces/jdbc/org/postgresql/PGConnection.java src/interfaces/myjdbc/org/postgresql/PGConnection.java
*** src/interfaces/jdbc/org/postgresql/PGConnection.java Sat Nov 29 11:52:09 2003
--- src/interfaces/myjdbc/org/postgresql/PGConnection.java Tue Dec 30 00:08:20 2003
***************
*** 17,22 ****
--- 17,23 ----
import java.sql.*;
import org.postgresql.core.Encoding;
+ import org.postgresql.copy.CopyManager;
import org.postgresql.fastpath.Fastpath;
import org.postgresql.largeobject.LargeObjectManager;
***************
*** 31,36 ****
--- 32,43 ----
public PGNotification[] getNotifications();
/**
+ * This returns the COPY API for the current connection.
+ * @since 7.5
+ */
+ public CopyManager getCopyAPI() throws SQLException;
+
+ /**
* This returns the LargeObject API for the current connection.
* @since 7.3
*/
diff -rcN -xCVS src/interfaces/jdbc/org/postgresql/copy/CopyManager.java src/interfaces/myjdbc/org/postgresql/copy/CopyManager.java
*** src/interfaces/jdbc/org/postgresql/copy/CopyManager.java Wed Dec 31 16:00:00 1969
--- src/interfaces/myjdbc/org/postgresql/copy/CopyManager.java Tue Dec 30 00:07:04 2003
***************
*** 0 ****
--- 1,250 ----
+ package org.postgresql.copy;
+
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.io.IOException;
+
+ import java.sql.SQLException;
+
+ import org.postgresql.core.BaseConnection;
+ import org.postgresql.core.PGStream;
+ import org.postgresql.core.Encoding;
+ import org.postgresql.core.Notification;
+ import org.postgresql.util.PSQLState;
+ import org.postgresql.util.PSQLException;
+
+ /**
+ * Implement COPY support in the JDBC driver. This requires a 7.4 server and
+ * a connection with the V3 protocol. Previous versions could not recover
+ * from errors and the connection had to be abandoned which was not acceptable.
+ */
+
+ public class CopyManager
+ {
+ private BaseConnection pgConn;
+ private PGStream pgStream;
+
+ public CopyManager(BaseConnection pgConn, PGStream pgStream)
+ {
+ this.pgConn = pgConn;
+ this.pgStream = pgStream;
+ }
+
+ /**
+ * Copy data from the InputStream into the given table using the
+ * default copy parameters.
+ */
+ public void copyIn(String table, InputStream is) throws SQLException
+ {
+ copyInQuery("COPY " + table + " FROM STDIN", is);
+ }
+
+ /**
+ * Copy data from the InputStream using the given COPY query. This
+ * allows specification of additional copy parameters such as the
+ * delimiter or NULL marker.
+ */
+ public void copyInQuery(String query, InputStream is) throws SQLException
+ {
+
+ synchronized(pgStream) {
+ sendQuery(query);
+ copyResultLoop(is,null);
+ }
+
+ }
+
+ /**
+ * Copy data from the given table to the OutputStream using the
+ * default copy parameters.
+ */
+ public void copyOut(String table, OutputStream os) throws SQLException
+ {
+ copyOutQuery("COPY " + table + " TO STDOUT", os);
+ }
+
+ /**
+ * Copy data to the OutputStream using the given COPY query. This
+ * allows specification of additional copy parameters such as the
+ * delimiter or NULL marker.
+ */
+ public void copyOutQuery(String query, OutputStream os) throws SQLException
+ {
+ synchronized(pgStream) {
+ sendQuery(query);
+ copyResultLoop(null,os);
+ }
+ }
+
+ /**
+ * After the copy query has been go through the possible responses.
+ * The flag which tells us whether we are doing copy in or out is
+ * simply where the InputStream or OutputStream is null.
+ *
+ * This is much like the loop in QueryExecutor, it could be merged
+ * into that, but it would require some generalization of its
+ * current specific tasks. Right now it has its query in m_binds[]
+ * form and expects to return a ResultSet. A more pluggable network
+ * layer would be nice so we could support the V2 and V3 protocols
+ * more cleanly and consider a SPI based layer for an in server
+ * pl/java. In general I think it's a bad idea for PGStream to
+ * be seen anywhere outside of the QueryExecutor.
+ */
+ private void copyResultLoop(InputStream is, OutputStream os) throws SQLException
+ {
+
+ Encoding encoding = pgConn.getEncoding();
+
+ PSQLException topLevelError = null;
+ boolean queryDone = false;
+ while (!queryDone)
+ {
+ int c = pgStream.ReceiveChar();
+ switch (c)
+ {
+ case 'A': // Asynch Notify
+ int pid = pgStream.ReceiveInteger(4);
+ String msg = pgStream.ReceiveString(encoding);
+ pgConn.addNotification(new Notification(msg, pid));
+ break;
+ case 'C': // Command Complete
+ int commandLength = pgStream.ReceiveIntegerR(4);
+ String command = encoding.decode(pgStream.Receive(commandLength - 4 - 1));
+ pgStream.Receive(1);
+ break;
+ case 'E': // Error Message
+ int errorLength = pgStream.ReceiveIntegerR(4);
+ String errorMessage = encoding.decode(pgStream.Receive(errorLength-4));
+ PSQLException error = PSQLException.parseServerError(errorMessage);
+ if (topLevelError != null) {
+ topLevelError.setNextException(error);
+ } else {
+ topLevelError = error;
+ }
+ break;
+ case 'N': // Error Notification
+ int notificationLength = pgStream.ReceiveIntegerR(4);
+ String notificationMessage = encoding.decode(pgStream.Receive(notificationLength-4));
+ pgConn.addWarning(notificationMessage);
+ break;
+ case 'G': // CopyInResponse
+ if (is == null)
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c));
+ receiveCopyInOutResponse();
+ sendCopyData(is);
+ break;
+ case 'H': // CopyOutResponse
+ if (os == null)
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c));
+ receiveCopyInOutResponse();
+ break;
+ case 'd': // CopyData
+ if (os == null)
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c));
+ receiveCopyData(os);
+ break;
+ case 'c': // CopyDone
+ int copyDoneLength = pgStream.ReceiveIntegerR(4);
+ break;
+ case 'Z': // ReadyForQuery
+ int messageLength = pgStream.ReceiveIntegerR(4);
+ char messageStatus = (char)pgStream.ReceiveChar();
+ queryDone = true;
+ break;
+ default:
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c));
+ }
+ }
+
+ if (topLevelError != null)
+ throw topLevelError;
+
+ }
+
+ private void sendQuery(String query) throws SQLException
+ {
+ Encoding encoding = pgConn.getEncoding();
+ try {
+ pgStream.SendChar('Q');
+ byte message[] = encoding.encode(query);
+ int messageSize = 4 + message.length + 1;
+ pgStream.SendInteger(messageSize,4);
+ pgStream.Send(message);
+ pgStream.SendChar(0);
+ pgStream.flush();
+ }
+ catch (IOException ioe)
+ {
+ throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe);
+ }
+ }
+
+ private void sendCopyData(InputStream is) throws SQLException
+ {
+ byte buf[] = new byte[8192];
+
+ int read = 0;
+
+ while (read >= 0) {
+ try {
+ read = is.read(buf);
+ }
+ catch (IOException ioe)
+ {
+ throw new PSQLException("postgresql.copy.inputsource", PSQLState.DATA_ERROR, ioe);
+ }
+
+ if (read > 0) {
+ try {
+ pgStream.SendChar('d');
+ int messageSize = read+4;
+ pgStream.SendInteger(messageSize,4);
+ pgStream.Send(buf, read);
+ }
+ catch (IOException ioe)
+ {
+ throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe);
+ }
+ }
+ }
+
+ // Send the CopyDone message
+ try {
+ pgStream.SendChar('c');
+ pgStream.SendInteger(4,4);
+ pgStream.flush();
+ }
+ catch (IOException ioe)
+ {
+ throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe);
+ }
+ }
+
+ /**
+ * CopyInResponse and CopyOutResponse have the same field
+ * layouts and we simply discard the results.
+ */
+ private void receiveCopyInOutResponse() throws SQLException
+ {
+ int messageLength = pgStream.ReceiveIntegerR(4);
+ int copyFormat = pgStream.ReceiveIntegerR(1);
+
+ int numColumns = pgStream.ReceiveIntegerR(2);
+ for (int i=0; i= 3)
+ copyManager = new CopyManager(this,pgStream);
+ return copyManager;
+ }
+
/*
* This method is used internally to return an object based around
* org.postgresql's more unique data types.
diff -rcN -xCVS src/interfaces/jdbc/org/postgresql/test/jdbc2/CopyTest.java src/interfaces/myjdbc/org/postgresql/test/jdbc2/CopyTest.java
*** src/interfaces/jdbc/org/postgresql/test/jdbc2/CopyTest.java Wed Dec 31 16:00:00 1969
--- src/interfaces/myjdbc/org/postgresql/test/jdbc2/CopyTest.java Mon Dec 29 20:45:19 2003
***************
*** 0 ****
--- 1,118 ----
+ package org.postgresql.test.jdbc2;
+
+ import junit.framework.TestCase;
+ import org.postgresql.test.TestUtil;
+ import org.postgresql.copy.CopyManager;
+ import java.io.*;
+ import java.sql.*;
+
+ public class CopyTest extends TestCase
+ {
+ private Connection conn;
+ private CopyManager copyManager;
+
+ protected void setUp() throws SQLException
+ {
+ conn = TestUtil.openDB();
+ TestUtil.createTable(conn, "copytesttable", "a int, b text, c float, d text");
+ copyManager = ((org.postgresql.PGConnection)conn).getCopyAPI();
+ }
+
+ protected void tearDown() throws SQLException
+ {
+ TestUtil.dropTable(conn, "copytesttable");
+ TestUtil.closeDB(conn);
+ }
+
+ private byte[] getData()
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ ps.println("35\tSome words go here\t3.14159\tz\\na");
+ ps.println("44\tMore text\t2.71828\th");
+ return baos.toByteArray();
+ }
+
+
+ public void testCopyIn() throws SQLException
+ {
+ if (((org.postgresql.core.BaseConnection)conn).getPGProtocolVersionMajor() < 3)
+ return;
+ InputStream is = new ByteArrayInputStream(getData());
+ copyManager.copyIn("copytesttable", is);
+
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT a,b,c,d FROM copytesttable");
+ int rowcount = 0;
+ while (rs.next()) {
+ if (rowcount == 0) {
+ assertEquals(rs.getInt(1), 35);
+ assertEquals(rs.getString(2), "Some words go here");
+ assertEquals(rs.getDouble(3), 3.14159, 0.00001);
+ assertEquals(rs.getString(4), "z\na");
+ } else if (rowcount == 1) {
+ assertEquals(rs.getInt(1), 44);
+ assertEquals(rs.getString(2), "More text");
+ assertEquals(rs.getDouble(3), 2.71828, 0.00001);
+ assertEquals(rs.getString(4), "h");
+ } else {
+ fail("Too many rows returned.");
+ }
+ rowcount++;
+ }
+ assertEquals(rowcount, 2);
+
+ rs.close();
+ stmt.close();
+
+ }
+
+ public void testCopyOut() throws SQLException
+ {
+ if (((org.postgresql.core.BaseConnection)conn).getPGProtocolVersionMajor() < 3)
+ return;
+ PreparedStatement pstmt = conn.prepareStatement("INSERT INTO copytesttable(a,b,c,d) VALUES (?,?,?,?)");
+
+ pstmt.setInt(1,35);
+ pstmt.setString(2,"Some words go here");
+ pstmt.setDouble(3,3.14159);
+ pstmt.setString(4,"z\na");
+ pstmt.executeUpdate();
+
+ pstmt.setInt(1,44);
+ pstmt.setString(2,"More text");
+ pstmt.setDouble(3,2.71828);
+ pstmt.setString(4,"h");
+ pstmt.executeUpdate();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ copyManager.copyOut("copytesttable", baos);
+
+ byte orig[] = getData();
+ byte server[] = baos.toByteArray();
+
+ assertEquals(orig.length,server.length);
+ for (int i=0; i