Index: src/backend/Makefile =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/Makefile,v retrieving revision 1.114 diff -c -r1.114 Makefile *** src/backend/Makefile 5 Jan 2006 01:56:29 -0000 1.114 --- src/backend/Makefile 17 May 2006 16:16:20 -0000 *************** *** 25,31 **** LIBS := $(filter-out -lpgport, $(LIBS)) # The backend doesn't need everything that's in LIBS, however ! LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) ########################################################################## --- 25,31 ---- LIBS := $(filter-out -lpgport, $(LIBS)) # The backend doesn't need everything that's in LIBS, however ! LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) ########################################################################## Index: src/backend/utils/sort/logtape.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/utils/sort/logtape.c,v retrieving revision 1.21 diff -c -r1.21 logtape.c *** src/backend/utils/sort/logtape.c 7 Mar 2006 23:46:24 -0000 1.21 --- src/backend/utils/sort/logtape.c 17 May 2006 16:16:21 -0000 *************** *** 80,91 **** --- 80,96 ---- #include "storage/buffile.h" #include "utils/logtape.h" + #include "zlib.h" + /* * Block indexes are "long"s, so we can fit this many per indirect block. * NB: we assume this is an exact fit! */ #define BLOCKS_PER_INDIR_BLOCK ((int) (BLCKSZ / sizeof(long))) + /* compression level to use for zlib */ + #define COMPRESSION_LEVEL 3 + /* * We use a struct like this for each active indirection level of each * logical tape. If the indirect block is not the highest level of its *************** *** 131,136 **** --- 136,142 ---- * reading. */ char *buffer; /* physical buffer (separately palloc'd) */ + z_stream zstream; /* zlib stream */ long curBlockNumber; /* this block's logical blk# within tape */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ *************** *** 509,514 **** --- 515,532 ---- } + static void * + lts_zalloc(void *cxt, unsigned int items, unsigned int size) + { + return MemoryContextAlloc( cxt, items*size ); + } + + static void + lts_zfree(void *cxt, void *ptr) + { + return pfree( ptr ); + } + /* * Create a set of logical tapes in a temporary underlying file. * *************** *** 556,561 **** --- 574,586 ---- lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + memset( <->zstream, 0, sizeof(lt->zstream) ); + lt->zstream.zalloc = lts_zalloc; + lt->zstream.zfree = lts_zfree; + lt->zstream.opaque = CurrentMemoryContext; + + deflateInit( <->zstream, COMPRESSION_LEVEL ); /* Fast compression */ } return lts; } *************** *** 627,633 **** void *ptr, size_t size) { LogicalTape *lt; - size_t nthistime; Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; --- 652,657 ---- *************** *** 643,650 **** lt->indirect->nextup = NULL; } ! while (size > 0) { if (lt->pos >= BLCKSZ) { /* Buffer full, dump it out */ --- 667,678 ---- lt->indirect->nextup = NULL; } ! lt->zstream.next_in = ptr; ! lt->zstream.avail_in = size; ! ! while (lt->zstream.avail_in > 0) { + int err; if (lt->pos >= BLCKSZ) { /* Buffer full, dump it out */ *************** *** 661,680 **** lt->nbytes = 0; } ! nthistime = BLCKSZ - lt->pos; ! if (nthistime > size) ! nthistime = size; ! Assert(nthistime > 0); ! memcpy(lt->buffer + lt->pos, ptr, nthistime); lt->dirty = true; ! lt->pos += nthistime; ! if (lt->nbytes < lt->pos) ! lt->nbytes = lt->pos; ! ptr = (void *) ((char *) ptr + nthistime); ! size -= nthistime; } } /* --- 689,741 ---- lt->nbytes = 0; } ! lt->zstream.next_out = lt->buffer + lt->pos; ! lt->zstream.avail_out = BLCKSZ - lt->pos; ! err = deflate( <->zstream, Z_NO_FLUSH ); ! lt->pos = BLCKSZ - lt->zstream.avail_out; ! lt->nbytes = lt->pos; ! lt->dirty = true; ! ! if( err != Z_OK ) ! elog(ERROR, "deflate returned error: %d\n", err ); ! } ! } ! /* Flushes any data still present in the buffers of the compression module. ! * Only necessary for writing (deflating), for reading we don't particularly ! * care about unflished data */ + static void + ltsFlush( LogicalTapeSet *lts, LogicalTape *lt) + { + int err; + + if( !lt->dirty ) + return; + + lt->zstream.next_in = NULL; + lt->zstream.avail_in = 0; + + for(;;) + { + lt->zstream.next_out = lt->buffer + lt->pos; + lt->zstream.avail_out = BLCKSZ - lt->pos; + err = deflate( <->zstream, Z_FINISH ); lt->dirty = true; ! ! if( err == Z_STREAM_END ) ! break; ! Assert(err == Z_OK); ! Assert(lt->zstream.avail_out == 0); ! ltsDumpBuffer(lts, lt); ! lt->numFullBlocks++; ! lt->curBlockNumber++; ! lt->nbytes = 0; ! lt->pos = 0; } + lt->pos = BLCKSZ - lt->zstream.avail_out; + lt->nbytes = lt->pos; + return; } /* *************** *** 692,697 **** --- 753,761 ---- Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; + lt->zstream.next_in = NULL; + lt->zstream.avail_in = 0; + if (!forWrite) { if (lt->writing) *************** *** 702,711 **** --- 766,781 ---- * (destructive) read. */ if (lt->dirty) + { + ltsFlush(lts, lt); ltsDumpBuffer(lts, lt); + } lt->lastBlockBytes = lt->nbytes; lt->writing = false; datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false); + + deflateEnd( <->zstream ); + inflateInit( <->zstream ); } else { *************** *** 715,720 **** --- 785,791 ---- */ Assert(lt->frozen); datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); + inflateReset( <->zstream ); } /* Read the first block, or reset if tape is empty */ lt->curBlockNumber = 0L; *************** *** 761,766 **** --- 832,840 ---- lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + inflateEnd( <->zstream ); + deflateInit( <->zstream, COMPRESSION_LEVEL ); } } *************** *** 774,788 **** void *ptr, size_t size) { LogicalTape *lt; ! size_t nread = 0; ! size_t nthistime; Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(!lt->writing); ! while (size > 0) { if (lt->pos >= lt->nbytes) { /* Try to load more data into buffer. */ --- 848,865 ---- void *ptr, size_t size) { LogicalTape *lt; ! // size_t nread = 0; Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(!lt->writing); ! lt->zstream.next_out = ptr; ! lt->zstream.avail_out = size; ! ! while (lt->zstream.avail_out > 0) { + int err; if (lt->pos >= lt->nbytes) { /* Try to load more data into buffer. */ *************** *** 801,821 **** if (lt->nbytes <= 0) break; /* EOF (possible here?) */ } ! ! nthistime = lt->nbytes - lt->pos; ! if (nthistime > size) ! nthistime = size; ! Assert(nthistime > 0); ! ! memcpy(ptr, lt->buffer + lt->pos, nthistime); ! ! lt->pos += nthistime; ! ptr = (void *) ((char *) ptr + nthistime); ! size -= nthistime; ! nread += nthistime; } ! return nread; } /* --- 878,895 ---- if (lt->nbytes <= 0) break; /* EOF (possible here?) */ } ! ! lt->zstream.next_in = lt->buffer + lt->pos; ! lt->zstream.avail_in = lt->nbytes - lt->pos; ! err = inflate( <->zstream, Z_NO_FLUSH ); ! lt->pos = lt->nbytes - lt->zstream.avail_in; ! /* Here we can detect the end of the compressed stream, ! * can't do anything with that information though */ ! if( err != Z_OK && err != Z_STREAM_END) ! elog(ERROR, "Inflate returned error: %d\n", err); } ! return size - lt->zstream.avail_out; } /* *************** *** 844,850 **** --- 918,927 ---- * partial indirect blocks, rewind for nondestructive read. */ if (lt->dirty) + { + ltsFlush(lts, lt); ltsDumpBuffer(lts, lt); + } lt->lastBlockBytes = lt->nbytes; lt->writing = false; lt->frozen = true; *************** *** 853,858 **** --- 930,939 ---- lt->curBlockNumber = 0L; lt->pos = 0; lt->nbytes = 0; + + deflateEnd( <->zstream ); + inflateInit( <->zstream ); + if (datablocknum != -1L) { ltsReadBlock(lts, datablocknum, (void *) lt->buffer); *************** *** 879,884 **** --- 960,967 ---- long nblocks; int newpos; + elog(PANIC,"LogicalTapeBackspace on compressed file not yet implemented"); + Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->frozen); *************** *** 944,949 **** --- 1027,1034 ---- { LogicalTape *lt; + elog(PANIC,"LogicalTapeSeek on compressed file not yet implemented"); + Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; Assert(lt->frozen); *************** *** 1007,1012 **** --- 1092,1099 ---- { LogicalTape *lt; + elog(PANIC,"LogicalTapeTell on compressed file not yet implemented"); + Assert(tapenum >= 0 && tapenum < lts->nTapes); lt = <s->tapes[tapenum]; *blocknum = lt->curBlockNumber;