Index: src/backend/commands/copy.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/commands/copy.c,v retrieving revision 1.255 diff -u -c -r1.255 copy.c *** src/backend/commands/copy.c 22 Nov 2005 18:17:08 -0000 1.255 --- src/backend/commands/copy.c 23 Dec 2005 13:29:57 -0000 *************** *** 102,107 **** --- 102,108 ---- int client_encoding; /* remote side's character encoding */ bool need_transcoding; /* client encoding diff from server? */ bool client_only_encoding; /* encoding not valid on server? */ + uint64 processed; /* # of tuples processed */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ *************** *** 646,652 **** * Do not allow the copy if user doesn't have proper permission to access * the table. */ ! void DoCopy(const CopyStmt *stmt) { CopyState cstate; --- 647,653 ---- * Do not allow the copy if user doesn't have proper permission to access * the table. */ ! uint64 DoCopy(const CopyStmt *stmt) { CopyState cstate; *************** *** 660,665 **** --- 661,667 ---- AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); AclResult aclresult; ListCell *option; + uint64 processed; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); *************** *** 936,941 **** --- 938,944 ---- cstate->line_buf_converted = false; cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; + cstate->processed = 0; /* Set up encoding conversion info */ cstate->client_encoding = pg_get_client_encoding(); *************** *** 1080,1086 **** --- 1083,1092 ---- pfree(cstate->attribute_buf.data); pfree(cstate->line_buf.data); pfree(cstate->raw_buf); + + processed = cstate->processed; pfree(cstate); + return processed; } *************** *** 1310,1315 **** --- 1316,1323 ---- VARSIZE(outputbytes) - VARHDRSZ); } } + + cstate->processed++; } CopySendEndOfRow(cstate); *************** *** 1916,1921 **** --- 1924,1931 ---- /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, tuple); + + cstate->processed++; } } Index: src/backend/tcop/utility.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/tcop/utility.c,v retrieving revision 1.250 diff -u -c -r1.250 utility.c *** src/backend/tcop/utility.c 29 Nov 2005 01:25:49 -0000 1.250 --- src/backend/tcop/utility.c 23 Dec 2005 13:29:59 -0000 *************** *** 640,646 **** break; case T_CopyStmt: ! DoCopy((CopyStmt *) parsetree); break; case T_PrepareStmt: --- 640,651 ---- break; case T_CopyStmt: ! { ! uint64 processed = DoCopy((CopyStmt *) parsetree); ! ! snprintf(completionTag, COMPLETION_TAG_BUFSIZE, ! "COPY " UINT64_FORMAT, processed); ! } break; case T_PrepareStmt: Index: src/bin/pg_dump/pg_backup_db.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v retrieving revision 1.66 diff -u -c -r1.66 pg_backup_db.c *** src/bin/pg_dump/pg_backup_db.c 15 Oct 2005 02:49:38 -0000 1.66 --- src/bin/pg_dump/pg_backup_db.c 23 Dec 2005 13:30:00 -0000 *************** *** 389,396 **** *--------- */ ! if (PQputline(AH->connection, AH->pgCopyBuf->data) != 0) ! die_horribly(AH, modulename, "error returned by PQputline\n"); resetPQExpBuffer(AH->pgCopyBuf); --- 389,396 ---- *--------- */ ! if (PQputCopyData(AH->connection, AH->pgCopyBuf->data, AH->pgCopyBuf->len) < 0) ! die_horribly(AH, modulename, "error returned by PQputCopyData\n"); resetPQExpBuffer(AH->pgCopyBuf); *************** *** 400,407 **** if (isEnd) { ! if (PQendcopy(AH->connection) != 0) ! die_horribly(AH, modulename, "error returned by PQendcopy\n"); AH->pgCopyIn = 0; } --- 400,407 ---- if (isEnd) { ! if (PQputCopyEnd(AH->connection, NULL) < 0) ! die_horribly(AH, modulename, "error returned by PQputCopyEnd\n"); AH->pgCopyIn = 0; } Index: src/bin/pg_dump/pg_dump.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v retrieving revision 1.424 diff -u -c -r1.424 pg_dump.c *** src/bin/pg_dump/pg_dump.c 3 Dec 2005 21:06:18 -0000 1.424 --- src/bin/pg_dump/pg_dump.c 23 Dec 2005 13:30:09 -0000 *************** *** 778,799 **** * to be dumped. */ - #define COPYBUFSIZ 8192 - static int dumpTableData_copy(Archive *fout, void *dcontext) { ! TableDataInfo *tdinfo = (TableDataInfo *) dcontext; ! TableInfo *tbinfo = tdinfo->tdtable; ! const char *classname = tbinfo->dobj.name; ! const bool hasoids = tbinfo->hasoids; ! const bool oids = tdinfo->oids; ! PQExpBuffer q = createPQExpBuffer(); ! PGresult *res; ! int ret; ! bool copydone; ! char copybuf[COPYBUFSIZ]; ! const char *column_list; if (g_verbose) write_msg(NULL, "dumping contents of table %s\n", classname); --- 778,796 ---- * to be dumped. */ static int dumpTableData_copy(Archive *fout, void *dcontext) { ! TableDataInfo *tdinfo = (TableDataInfo *) dcontext; ! TableInfo *tbinfo = tdinfo->tdtable; ! const char *classname = tbinfo->dobj.name; ! const bool hasoids = tbinfo->hasoids; ! const bool oids = tdinfo->oids; ! PQExpBuffer q = createPQExpBuffer(); ! PGresult *res; ! int ret; ! char *copybuf; ! const char *column_list; if (g_verbose) write_msg(NULL, "dumping contents of table %s\n", classname); *************** *** 834,867 **** res = PQexec(g_conn, q->data); check_sql_result(res, g_conn, q->data, PGRES_COPY_OUT); ! copydone = false; ! ! while (!copydone) { ! ret = PQgetline(g_conn, copybuf, COPYBUFSIZ); ! if (copybuf[0] == '\\' && ! copybuf[1] == '.' && ! copybuf[2] == '\0') ! { ! copydone = true; /* don't print this... */ ! } ! else { archputs(copybuf, fout); ! switch (ret) ! { ! case EOF: ! copydone = true; ! /* FALLTHROUGH */ ! case 0: ! archputs("\n", fout); ! break; ! case 1: ! break; ! } } /* * THROTTLE: * --- 831,851 ---- res = PQexec(g_conn, q->data); check_sql_result(res, g_conn, q->data, PGRES_COPY_OUT); ! for (;;) { ! ret = PQgetCopyData(g_conn, ©buf, 0); ! /* buffer is filled by libpq */ ! if (ret > 0) { archputs(copybuf, fout); ! PQfreemem(copybuf); } + /* copy completed (or an error occured) */ + else if (ret < 0) + break; + /* * THROTTLE: * *************** *** 904,913 **** } archprintf(fout, "\\.\n\n\n"); ! ret = PQendcopy(g_conn); ! if (ret != 0) { ! write_msg(NULL, "SQL command to dump the contents of table \"%s\" failed: PQendcopy() failed.\n", classname); write_msg(NULL, "Error message from server: %s", PQerrorMessage(g_conn)); write_msg(NULL, "The command was: %s\n", q->data); exit_nicely(); --- 888,897 ---- } archprintf(fout, "\\.\n\n\n"); ! /* copy failed */ ! if (ret == -2) { ! write_msg(NULL, "SQL command to dump the contents of table \"%s\" failed: PQgetCopyData() failed.\n", classname); write_msg(NULL, "Error message from server: %s", PQerrorMessage(g_conn)); write_msg(NULL, "The command was: %s\n", q->data); exit_nicely(); Index: src/bin/psql/common.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/psql/common.c,v retrieving revision 1.111 diff -u -c -r1.111 common.c *** src/bin/psql/common.c 22 Nov 2005 18:17:29 -0000 1.111 --- src/bin/psql/common.c 23 Dec 2005 13:30:10 -0000 *************** *** 844,857 **** * Returns true if the query executed successfully, false otherwise. */ static bool ! ProcessCopyResult(PGresult *results) { ! bool success = false; ! if (!results) return false; ! switch (PQresultStatus(results)) { case PGRES_TUPLES_OK: case PGRES_COMMAND_OK: --- 844,859 ---- * Returns true if the query executed successfully, false otherwise. */ static bool ! ProcessCopyResult(PGresult **results) { ! bool success, req_getres; ! if (!*results) return false; ! success = req_getres = false; ! ! switch (PQresultStatus(*results)) { case PGRES_TUPLES_OK: case PGRES_COMMAND_OK: *************** *** 861,871 **** break; case PGRES_COPY_OUT: ! success = handleCopyOut(pset.db, pset.queryFout); break; case PGRES_COPY_IN: ! success = handleCopyIn(pset.db, pset.cur_cmd_source); break; default: --- 863,873 ---- break; case PGRES_COPY_OUT: ! req_getres = success = handleCopyOut(pset.db, pset.queryFout); break; case PGRES_COPY_IN: ! req_getres = success = handleCopyIn(pset.db, pset.cur_cmd_source); break; default: *************** *** 875,880 **** --- 877,894 ---- /* may need this to recover from conn loss during COPY */ if (!CheckConnection()) return false; + + /* + * We need a last PQgetResult() call to learn whether COPY + * succeded or not. + */ + if (req_getres) + { + PQclear(*results); + *results = PQgetResult(pset.db); + if (PQresultStatus(*results) != PGRES_COMMAND_OK) + success = false; + } return success; } *************** *** 1058,1064 **** results = PQexec(pset.db, query); /* these operations are included in the timing result: */ ! OK = (AcceptResult(results, query) && ProcessCopyResult(results)); if (pset.timing) GETTIMEOFDAY(&after); --- 1072,1078 ---- results = PQexec(pset.db, query); /* these operations are included in the timing result: */ ! OK = (AcceptResult(results, query) && ProcessCopyResult(&results)); if (pset.timing) GETTIMEOFDAY(&after); Index: src/bin/psql/copy.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/psql/copy.c,v retrieving revision 1.58 diff -u -c -r1.58 copy.c *** src/bin/psql/copy.c 15 Oct 2005 02:49:40 -0000 1.58 --- src/bin/psql/copy.c 23 Dec 2005 13:30:11 -0000 *************** *** 585,656 **** return success; } ! #define COPYBUFSIZ 8192 /* size doesn't matter */ ! /* * handleCopyOut * receives data as a result of a COPY ... TO stdout command - * - * If you want to use COPY TO in your application, this is the code to steal :) - * - * conn should be a database connection that you just called COPY TO on - * (and which gave you PGRES_COPY_OUT back); - * copystream is the file stream you want the output to go to */ bool handleCopyOut(PGconn *conn, FILE *copystream) { ! bool copydone = false; /* haven't started yet */ ! char copybuf[COPYBUFSIZ]; ! int ret; ! while (!copydone) { ! ret = PQgetline(conn, copybuf, COPYBUFSIZ); ! if (copybuf[0] == '\\' && ! copybuf[1] == '.' && ! copybuf[2] == '\0') { ! copydone = true; /* we're at the end */ } ! else { ! fputs(copybuf, copystream); ! switch (ret) ! { ! case EOF: ! copydone = true; ! /* FALLTHROUGH */ ! case 0: ! fputc('\n', copystream); ! break; ! case 1: ! break; ! } } } fflush(copystream); - ret = !PQendcopy(conn); ResetCancelConn(); ! return ret; } - - /* * handleCopyIn * receives data as a result of a COPY ... FROM stdin command - * - * Again, if you want to use COPY FROM in your application, copy this. - * - * conn should be a database connection that you just called COPY FROM on - * (and which gave you PGRES_COPY_IN back); - * copystream is the file stream you want the input to come from */ - bool handleCopyIn(PGconn *conn, FILE *copystream) { --- 585,651 ---- return success; } + /* + * Function routines for handling COPY IN/OUT input/output. + * + * If you want to use COPY TO in your application, this is the + * code to steal ;) + * + * conn should be a database connection that you just issued COPY FROM/TO + * and copystream is the file stream for input/output to read/go. + * + * PGRES_COPY_IN and PGRES_COPY_OUT results will be untouched, so you + * should make a PQgetResult() call at the end to learn whether COPY + * succeeded or not. (This will bring COPY command's commandTag string too.) + */ ! /* chunk size will be used during COPY IN - size doesn't matter */ ! #define COPYBUFSIZ 8192 /* * handleCopyOut * receives data as a result of a COPY ... TO stdout command */ bool handleCopyOut(PGconn *conn, FILE *copystream) { ! char *buf; ! int ret; ! bool res = true; ! for (;;) { ! ret = PQgetCopyData(conn, &buf, 0); ! /* Buffer is filled by libpq */ ! if (ret > 0) { ! fputs(buf, copystream); ! PQfreemem(buf); } ! ! /* copy done */ ! else if (ret == -1) ! break; ! ! /* oops */ ! else if (ret == -2) { ! res = false; ! break; } } + fflush(copystream); ResetCancelConn(); ! ! return res; } /* * handleCopyIn * receives data as a result of a COPY ... FROM stdin command */ bool handleCopyIn(PGconn *conn, FILE *copystream) { *************** *** 658,670 **** bool copydone = false; bool firstload; bool linedone; ! bool saw_cr = false; ! char copybuf[COPYBUFSIZ]; char *s; - int bufleft; - int c = 0; int ret; - unsigned int linecount = 0; /* Prompt if interactive input */ if (isatty(fileno(copystream))) --- 653,661 ---- bool copydone = false; bool firstload; bool linedone; ! char buf[COPYBUFSIZ]; char *s; int ret; /* Prompt if interactive input */ if (isatty(fileno(copystream))) *************** *** 684,747 **** fputs(prompt, stdout); fflush(stdout); } firstload = true; linedone = false; while (!linedone) { /* for each bufferload in line ... */ ! /* Fetch string until \n, EOF, or buffer full */ ! s = copybuf; ! for (bufleft = COPYBUFSIZ - 1; bufleft > 0; bufleft--) { ! c = getc(copystream); ! if (c == EOF) ! { ! linedone = true; ! break; ! } ! *s++ = c; ! if (c == '\n') { ! linedone = true; ! break; } ! if (c == '\r') ! saw_cr = true; ! } ! *s = '\0'; ! /* EOF with empty line-so-far? */ ! if (c == EOF && s == copybuf && firstload) ! { ! /* ! * We are guessing a little bit as to the right line-ending ! * here... ! */ ! if (saw_cr) ! PQputline(conn, "\\.\r\n"); ! else ! PQputline(conn, "\\.\n"); copydone = true; - if (pset.cur_cmd_interactive) - puts("\\."); break; } ! /* No, so pass the data to the backend */ ! PQputline(conn, copybuf); ! /* Check for line consisting only of \. */ if (firstload) { ! if (strcmp(copybuf, "\\.\n") == 0 || ! strcmp(copybuf, "\\.\r\n") == 0) { copydone = true; break; } firstload = false; } } ! linecount++; } ! ret = !PQendcopy(conn); ! pset.lineno += linecount; ! return ret; } --- 675,725 ---- fputs(prompt, stdout); fflush(stdout); } + firstload = true; linedone = false; while (!linedone) { /* for each bufferload in line ... */ ! s = fgets(buf, COPYBUFSIZ, copystream); ! if (!s) { ! if (ferror(copystream)) { ! (void) PQputCopyEnd(conn, "Due to fgets() failure!"); ! return false; } ! copydone = true; break; } ! ! /* Locating EOF. (Required for below and buf length.) */ ! s = memchr(buf, '\0', COPYBUFSIZ); ! ! /* current line is done. */ ! if (*(s-1) == '\n') ! linedone = true; ! if (firstload) { ! if (strcmp(buf, "\\.\n") == 0 || ! strcmp(buf, "\\.\r\n") == 0) { copydone = true; break; } + firstload = false; } + + ret = PQputCopyData(conn, buf, (s-buf)); + if (ret < 0) + return false; } ! ! pset.lineno++; } ! ! return PQputCopyEnd(conn, NULL) < 0 ? false : true; } Index: src/include/commands/copy.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/commands/copy.h,v retrieving revision 1.25 diff -u -c -r1.25 copy.h *** src/include/commands/copy.h 31 Dec 2004 22:03:28 -0000 1.25 --- src/include/commands/copy.h 23 Dec 2005 13:30:11 -0000 *************** *** 16,22 **** #include "nodes/parsenodes.h" ! ! extern void DoCopy(const CopyStmt *stmt); #endif /* COPY_H */ --- 16,21 ---- #include "nodes/parsenodes.h" ! extern uint64 DoCopy(const CopyStmt *stmt); #endif /* COPY_H */ Index: src/interfaces/libpq/fe-exec.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v retrieving revision 1.177 diff -u -c -r1.177 fe-exec.c *** src/interfaces/libpq/fe-exec.c 22 Nov 2005 18:17:32 -0000 1.177 --- src/interfaces/libpq/fe-exec.c 23 Dec 2005 13:30:15 -0000 *************** *** 2177,2183 **** char * PQcmdTuples(PGresult *res) { ! char *p; if (!res) return ""; --- 2177,2183 ---- char * PQcmdTuples(PGresult *res) { ! char *p, *c; if (!res) return ""; *************** *** 2195,2201 **** p = res->cmdStatus + 6; else if (strncmp(res->cmdStatus, "FETCH ", 6) == 0) p = res->cmdStatus + 5; ! else if (strncmp(res->cmdStatus, "MOVE ", 5) == 0) p = res->cmdStatus + 4; else return ""; --- 2195,2202 ---- p = res->cmdStatus + 6; else if (strncmp(res->cmdStatus, "FETCH ", 6) == 0) p = res->cmdStatus + 5; ! else if (strncmp(res->cmdStatus, "MOVE ", 5) == 0 || ! strncmp(res->cmdStatus, "COPY ", 5) == 0) p = res->cmdStatus + 4; else return ""; *************** *** 2203,2216 **** p++; if (*p == 0) ! { ! pqInternalNotice(&res->noticeHooks, ! "could not interpret result from server: %s", ! res->cmdStatus); ! return ""; ! } ! ! return p; } /* --- 2204,2222 ---- p++; if (*p == 0) ! goto interpret_error; ! ! /* check if we have an int */ ! for (c = p; isdigit((unsigned char) *c); ++c) ! ; ! if (*c == 0) ! return p; ! ! interpret_error: ! pqInternalNotice(&res->noticeHooks, ! "could not interpret result from server: %s", ! res->cmdStatus); ! return ""; } /*