Index: src/backend/commands/copy.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/commands/copy.c,v retrieving revision 1.266 diff -c -r1.266 copy.c *** src/backend/commands/copy.c 26 May 2006 22:50:02 -0000 1.266 --- src/backend/commands/copy.c 31 May 2006 08:52:42 -0000 *************** *** 47,53 **** #include "utils/memutils.h" #include "utils/relcache.h" #include "utils/syscache.h" ! #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') --- 47,53 ---- #include "utils/memutils.h" #include "utils/relcache.h" #include "utils/syscache.h" ! #include "utils/pg_lzcompress.h" #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7')) #define OCTVALUE(c) ((c) - '0') *************** *** 103,114 **** --- 103,121 ---- int client_encoding; /* remote side's character encoding */ bool need_transcoding; /* client encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + bool do_compress; /* compress data before writing to output */ + bool do_flush; /* flush fe_msgbuf to copy target file/pipe */ + bool use_raw_buf; /* use raw buffered data for CopyGetData */ uint64 processed; /* # of tuples processed */ + uint64 progress; /* progress notice each # tuples processed */ + + MemoryContext oldcontext; /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ List *attnumlist; /* integer list of attnums to copy */ bool binary; /* binary format? */ + bool compression; /* binary compressed format? */ bool oids; /* include OIDs? */ bool csv_mode; /* Comma Separated Value format? */ bool header_line; /* CSV header line? */ *************** *** 153,162 **** * converts it. Note: we guarantee that there is a \0 at * raw_buf[raw_buf_len]. */ ! #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ } CopyStateData; typedef CopyStateData *CopyState; --- 160,170 ---- * converts it. Note: we guarantee that there is a \0 at * raw_buf[raw_buf_len]. */ ! #define RAW_BUF_SIZE 65536 /* initially, we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + int raw_buf_size; /* actual raw_buf_size */ } CopyStateData; typedef CopyStateData *CopyState; *************** *** 260,265 **** --- 268,276 ---- static void CopySendEndOfRow(CopyState cstate); static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread); + static bool CopyLoadRawBuf(CopyState cstate); + static int CopyLoadBuf(CopyState cstate, void *databuf, + int minread, int maxread); static void CopySendInt32(CopyState cstate, int32 val); static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); *************** *** 409,442 **** static void CopySendEndOfRow(CopyState cstate) { StringInfo fe_msgbuf = cstate->fe_msgbuf; ! switch (cstate->copy_dest) { ! case COPY_FILE: ! if (!cstate->binary) ! { ! /* Default line termination depends on platform */ ! #ifndef WIN32 ! CopySendChar(cstate, '\n'); #else ! CopySendString(cstate, "\r\n"); #endif ! } ! (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, ! 1, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); break; case COPY_OLD_FE: ! /* The FE/BE protocol uses \n as newline for all platforms */ ! if (!cstate->binary) ! CopySendChar(cstate, '\n'); ! ! if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)) { /* no hope of recovering connection sync, so FATAL */ ereport(FATAL, --- 420,497 ---- static void CopySendEndOfRow(CopyState cstate) { + PGLZ_Header *tmp=0; StringInfo fe_msgbuf = cstate->fe_msgbuf; + + void *data; + int len; + bool writeUncompressed = false; ! if (!cstate->binary) { ! /* Default line termination depends on platform */ ! #ifdef WIN32 ! if (cstate->copy_dest == COPY_FILE) ! CopySendString(cstate, "\r\n"); ! else #else ! /* The FE/BE protocol uses \n as newline for all platforms */ ! CopySendChar(cstate, '\n'); #endif ! } ! ! ! if (cstate->do_compress) ! { ! if (!cstate->do_flush && fe_msgbuf->len < RAW_BUF_SIZE) ! { ! /* Wait for some more data until we compress and write out */ ! return; ! } ! ! tmp = (PGLZ_Header *) palloc(PGLZ_MAX_OUTPUT(fe_msgbuf->len)); ! ! #if 1 ! (void) pglz_compress(fe_msgbuf->data, fe_msgbuf->len, tmp, NULL); ! #else /* simulate non-compressible data, test compression performance */ ! tmp->varsize = fe_msgbuf->len + sizeof(PGLZ_Header); ! tmp->rawsize = fe_msgbuf->len; ! #endif ! data = tmp; ! ! if (PGLZ_IS_COMPRESSED(tmp)) ! len = tmp->varsize; ! else ! { ! // incompressible data ! len = sizeof(PGLZ_Header); ! writeUncompressed = true; ! } ! } ! else ! { ! data = fe_msgbuf->data; ! len = fe_msgbuf->len; ! } ! switch (cstate->copy_dest) ! { ! case COPY_FILE: ! (void) fwrite(data, len, 1, cstate->copy_file); ! if (ferror(cstate->copy_file)) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not write to COPY file: %m"))); ! if (writeUncompressed) ! (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to COPY file: %m"))); break; case COPY_OLD_FE: ! if (pq_putbytes(data, len) || ! (writeUncompressed && pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))) { /* no hope of recovering connection sync, so FATAL */ ereport(FATAL, *************** *** 445,459 **** } break; case COPY_NEW_FE: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ ! (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; } /* Reset fe_msgbuf to empty */ fe_msgbuf->len = 0; fe_msgbuf->data[0] = '\0'; --- 500,515 ---- } break; case COPY_NEW_FE: /* Dump the accumulated row as one CopyData message */ ! (void) pq_putmessage('d', data, len); ! if (writeUncompressed) ! (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; } + if (tmp) + pfree(tmp); + /* Reset fe_msgbuf to empty */ fe_msgbuf->len = 0; fe_msgbuf->data[0] = '\0'; *************** *** 475,480 **** --- 531,564 ---- static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) { + if (cstate->use_raw_buf) + { + int bytesread = 0; + + while (bytesread < minread) + { + int nbytes = cstate->raw_buf_len - cstate->raw_buf_index; + if (nbytes > maxread-bytesread) + nbytes = maxread-bytesread; + + memcpy((char*)databuf + bytesread, cstate->raw_buf + cstate->raw_buf_index, nbytes); + cstate->raw_buf_index += nbytes; + bytesread += nbytes; + + if (bytesread >= minread || !CopyLoadRawBuf(cstate)) + break; + } + + return bytesread; + } + else + return CopyLoadBuf(cstate, databuf, minread, maxread); + } + + + static int + CopyLoadBuf(CopyState cstate, void *databuf, int minread, int maxread) + { int bytesread = 0; switch (cstate->copy_dest) *************** *** 662,669 **** else nbytes = 0; /* no data need be saved */ ! inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes, ! 1, RAW_BUF_SIZE - nbytes); nbytes += inbytes; cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; --- 746,816 ---- else nbytes = 0; /* no data need be saved */ ! if (cstate->do_compress) ! { ! PGLZ_Header pglzHdr; ! inbytes = CopyLoadBuf(cstate, &pglzHdr, sizeof(PGLZ_Header), sizeof(PGLZ_Header)); ! ! if (inbytes != sizeof(PGLZ_Header)) ! { ! ereport(ERROR, ! (errcode(ERRCODE_CONNECTION_FAILURE), ! errmsg("not enough data"))); ! } ! /* make sure raw_buf is big enough */ ! if (cstate->raw_buf_size < pglzHdr.rawsize + nbytes) ! { ! char *newbuf; ! MemoryContext rowContext; ! ! cstate->raw_buf_size = pglzHdr.rawsize + nbytes; ! ! /* raw_buf is allocated statement-wide */ ! rowContext = MemoryContextSwitchTo(cstate->oldcontext); ! newbuf=palloc(cstate->raw_buf_size+1); ! MemoryContextSwitchTo(rowContext); ! ! if (nbytes > 0) ! memcpy(newbuf, cstate->raw_buf, nbytes); ! ! pfree(cstate->raw_buf); ! cstate->raw_buf = newbuf; ! } ! ! if (PGLZ_IS_COMPRESSED(&pglzHdr)) ! { ! PGLZ_Header *tmp = (PGLZ_Header*)palloc(pglzHdr.varsize); ! memcpy(tmp, &pglzHdr, sizeof(PGLZ_Header)); ! ! inbytes = CopyLoadBuf(cstate, (char*)tmp + sizeof(PGLZ_Header), ! pglzHdr.varsize - sizeof(PGLZ_Header), pglzHdr.varsize - sizeof(PGLZ_Header)); ! if (inbytes != pglzHdr.varsize-sizeof(PGLZ_Header)) ! { ! ereport(ERROR, ! (errcode(ERRCODE_CONNECTION_FAILURE), ! errmsg("not enough data"))); ! } ! pglz_decompress(tmp, cstate->raw_buf + nbytes); ! inbytes = pglzHdr.rawsize; ! } ! else ! { ! /* not compressed */ ! inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes, ! pglzHdr.rawsize, pglzHdr.rawsize); ! if (inbytes != pglzHdr.rawsize) ! { ! ereport(ERROR, ! (errcode(ERRCODE_CONNECTION_FAILURE), ! errmsg("not enough data"))); ! } ! } ! } ! else ! { ! inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes, ! cstate->raw_buf_size - nbytes, cstate->raw_buf_size - nbytes); ! } nbytes += inbytes; cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; *************** *** 733,738 **** --- 880,902 ---- errmsg("conflicting or redundant options"))); cstate->binary = intVal(defel->arg); } + else if (strcmp(defel->defname, "compression") == 0) + { + if (cstate->compression) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + cstate->compression = intVal(defel->arg); + cstate->binary = intVal(defel->arg); + } + else if (strcmp(defel->defname, "progress") == 0) + { + if (cstate->progress) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + cstate->progress = intVal(defel->arg); + } else if (strcmp(defel->defname, "oids") == 0) { if (cstate->oids) *************** *** 1009,1015 **** initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; ! cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->processed = 0; --- 1173,1180 ---- initStringInfo(&cstate->attribute_buf); initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; ! cstate->raw_buf_size=RAW_BUF_SIZE; ! cstate->raw_buf = (char *) palloc(cstate->raw_buf_size+1); cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->processed = 0; *************** *** 1274,1287 **** --- 1439,1462 ---- /* Signature */ CopySendData(cstate, (char *) BinarySignature, 11); + /* Flags field */ tmp = 0; if (cstate->oids) tmp |= (1 << 16); + if (cstate->compression) + tmp |= (2 << 16); CopySendInt32(cstate, tmp); /* No header extension */ tmp = 0; CopySendInt32(cstate, tmp); + + if (cstate->compression) + { + CopySendEndOfRow(cstate); + /* from now on, rows will be compressed */ + cstate->do_compress = true; + } } else { *************** *** 1404,1413 **** } CopySendEndOfRow(cstate); - MemoryContextSwitchTo(oldcontext); ! cstate->processed++; } heap_endscan(scandesc); --- 1579,1589 ---- } CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); ! cstate->processed++; + if (cstate->progress && (cstate->processed % cstate->progress) == 0) + ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed))); } heap_endscan(scandesc); *************** *** 1417,1422 **** --- 1593,1599 ---- /* Generate trailer for a binary copy */ CopySendInt16(cstate, -1); /* Need to flush out the trailer */ + cstate->do_flush = true; CopySendEndOfRow(cstate); } *************** *** 1563,1570 **** int *defmap; ExprState **defexprs; /* array of default att expressions */ ExprContext *econtext; /* used for ExecEvalExpr for default atts */ - MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; tupDesc = RelationGetDescr(cstate->rel); attr = tupDesc->attrs; --- 1740,1747 ---- int *defmap; ExprState **defexprs; /* array of default att expressions */ ExprContext *econtext; /* used for ExecEvalExpr for default atts */ ErrorContextCallback errcontext; + cstate->oldcontext = CurrentMemoryContext; tupDesc = RelationGetDescr(cstate->rel); attr = tupDesc->attrs; *************** *** 1677,1683 **** (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); file_has_oids = (tmp & (1 << 16)) != 0; ! tmp &= ~(1 << 16); if ((tmp >> 16) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), --- 1854,1861 ---- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); file_has_oids = (tmp & (1 << 16)) != 0; ! cstate->compression = (tmp & (2 << 16)) != 0; ! tmp &= ~(3 << 16); if ((tmp >> 16) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), *************** *** 1696,1701 **** --- 1874,1883 ---- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); } + + if (cstate->compression) + cstate->do_compress = true; + cstate->use_raw_buf = true; } if (file_has_oids && cstate->binary) *************** *** 1913,1919 **** HeapTupleSetOid(tuple, loaded_oid); /* Triggers and stuff need to be invoked in query context. */ ! MemoryContextSwitchTo(oldcontext); skip_tuple = false; --- 2095,2101 ---- HeapTupleSetOid(tuple, loaded_oid); /* Triggers and stuff need to be invoked in query context. */ ! MemoryContextSwitchTo(cstate->oldcontext); skip_tuple = false; *************** *** 1958,1970 **** * tuples inserted by an INSERT command. */ cstate->processed++; } } /* Done, clean up */ error_context_stack = errcontext.previous; ! MemoryContextSwitchTo(oldcontext); /* Execute AFTER STATEMENT insertion triggers */ ExecASInsertTriggers(estate, resultRelInfo); --- 2140,2159 ---- * tuples inserted by an INSERT command. */ cstate->processed++; + + if (cstate->progress && (cstate->processed % cstate->progress) == 0) + { + error_context_stack = errcontext.previous; + ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed))); + error_context_stack = &errcontext; + } } } /* Done, clean up */ error_context_stack = errcontext.previous; ! MemoryContextSwitchTo(cstate->oldcontext); /* Execute AFTER STATEMENT insertion triggers */ ExecASInsertTriggers(estate, resultRelInfo); Index: src/backend/parser/gram.y =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v retrieving revision 2.545 diff -c -r2.545 gram.y *** src/backend/parser/gram.y 27 May 2006 17:38:45 -0000 2.545 --- src/backend/parser/gram.y 31 May 2006 08:52:56 -0000 *************** *** 366,372 **** CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT ! COMMITTED CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE --- 366,372 ---- CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT ! COMMITTED COMPRESSION CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE *************** *** 408,414 **** PARTIAL PASSWORD PLACING POSITION PRECISION PRESERVE PREPARE PREPARED PRIMARY ! PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE --- 408,414 ---- PARTIAL PASSWORD PLACING POSITION PRECISION PRESERVE PREPARE PREPARED PRIMARY ! PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRESS QUOTE *************** *** 1649,1654 **** --- 1649,1662 ---- { $$ = makeDefElem("binary", (Node *)makeInteger(TRUE)); } + | COMPRESSION + { + $$ = makeDefElem("compression", (Node *)makeInteger(TRUE)); + } + | PROGRESS opt_as Iconst + { + $$ = makeDefElem("progress", (Node *)makeInteger($3)); + } | OIDS { $$ = makeDefElem("oids", (Node *)makeInteger(TRUE)); *************** *** 8369,8374 **** --- 8377,8383 ---- | COMMENT | COMMIT | COMMITTED + | COMPRESSION | CONNECTION | CONSTRAINTS | CONVERSION_P *************** *** 8476,8481 **** --- 8485,8491 ---- | PRIVILEGES | PROCEDURAL | PROCEDURE + | PROGRESS | QUOTE | READ | REASSIGN Index: src/backend/parser/keywords.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/parser/keywords.c,v retrieving revision 1.171 diff -c -r1.171 keywords.c *** src/backend/parser/keywords.c 5 Mar 2006 15:58:32 -0000 1.171 --- src/backend/parser/keywords.c 31 May 2006 08:52:57 -0000 *************** *** 83,88 **** --- 83,89 ---- {"comment", COMMENT}, {"commit", COMMIT}, {"committed", COMMITTED}, + {"compression", COMPRESSION}, {"connection", CONNECTION}, {"constraint", CONSTRAINT}, {"constraints", CONSTRAINTS}, *************** *** 267,272 **** --- 268,274 ---- {"privileges", PRIVILEGES}, {"procedural", PROCEDURAL}, {"procedure", PROCEDURE}, + {"progress", PROGRESS}, {"quote", QUOTE}, {"read", READ}, {"real", REAL}, Index: src/bin/psql/copy.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/psql/copy.c,v retrieving revision 1.61 diff -c -r1.61 copy.c *** src/bin/psql/copy.c 26 May 2006 19:51:29 -0000 1.61 --- src/bin/psql/copy.c 31 May 2006 08:53:00 -0000 *************** *** 62,70 **** --- 62,72 ---- bool psql_inout; /* true = use psql stdin/stdout */ bool from; bool binary; + bool compression; bool oids; bool csv_mode; bool header; + long progress; char *delim; char *null; char *quote; *************** *** 139,145 **** } result->table = pg_strdup(token); - token = strtokx(NULL, whitespace, ".,()", "\"", 0, false, pset.encoding); if (!token) --- 141,146 ---- *************** *** 283,288 **** --- 284,297 ---- result->oids = true; else if (pg_strcasecmp(token, "binary") == 0) result->binary = true; + else if (pg_strcasecmp(token, "compression") == 0) + result->compression = true; + else if (pg_strcasecmp(token, "progress") == 0) + { + token = strtokx(NULL, whitespace, NULL, NULL, + 0, false, pset.encoding); + result->progress = atol(token); + } else if (pg_strcasecmp(token, "csv") == 0) result->csv_mode = true; else if (pg_strcasecmp(token, "header") == 0) *************** *** 478,483 **** --- 487,497 ---- appendPQExpBuffer(&query, " WITH NULL AS '%s'", options->null); } + if (options->compression) + appendPQExpBuffer(&query, " COMPRESSION"); + if (options->progress) + appendPQExpBuffer(&query, " PROGRESS %ld", options->progress); + if (options->csv_mode) appendPQExpBuffer(&query, " CSV"); Index: src/include/utils/pg_lzcompress.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/utils/pg_lzcompress.h,v retrieving revision 1.11 diff -c -r1.11 pg_lzcompress.h *** src/include/utils/pg_lzcompress.h 25 May 2005 21:40:42 -0000 1.11 --- src/include/utils/pg_lzcompress.h 31 May 2006 08:53:02 -0000 *************** *** 56,62 **** * ---------- */ #define PGLZ_IS_COMPRESSED(_lzdata) ((_lzdata)->varsize != \ ! e (_lzdata)->rawsize + e \ sizeof(PGLZ_Header)) /* ---------- --- 56,62 ---- * ---------- */ #define PGLZ_IS_COMPRESSED(_lzdata) ((_lzdata)->varsize != \ ! (_lzdata)->rawsize + \ sizeof(PGLZ_Header)) /* ----------