Index: src/backend/access/heap/heapam.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/heap/heapam.c,v
retrieving revision 1.230
diff -c -r1.230 heapam.c
*** src/backend/access/heap/heapam.c	29 Mar 2007 00:15:37 -0000	1.230
--- src/backend/access/heap/heapam.c	29 Mar 2007 08:27:21 -0000
***************
*** 3280,3285 ****
--- 3280,3322 ----
  	return log_heap_update(reln, oldbuf, from, newbuf, newtup, true);
  }
  
+ /*
+  * Perofrm XLogInsert of a full page image to WAL. Caller must make sure
+  * the page contents on disk are consistent with the page inserted to WAL.
+  */
+ XLogRecPtr
+ log_newpage(RelFileNode *rnode, BlockNumber blkno, Page page)
+ {
+ 	xl_heap_newpage xlrec;
+ 	XLogRecPtr	recptr;
+ 	XLogRecData rdata[2];
+ 
+ 	/* NO ELOG(ERROR) from here till newpage op is logged */
+ 	START_CRIT_SECTION();
+ 
+ 	xlrec.node = *rnode;
+ 	xlrec.blkno = blkno;
+ 
+ 	rdata[0].data = (char *) &xlrec;
+ 	rdata[0].len = SizeOfHeapNewpage;
+ 	rdata[0].buffer = InvalidBuffer;
+ 	rdata[0].next = &(rdata[1]);
+ 
+ 	rdata[1].data = (char *) page;
+ 	rdata[1].len = BLCKSZ;
+ 	rdata[1].buffer = InvalidBuffer;
+ 	rdata[1].next = NULL;
+ 
+ 	recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
+ 
+ 	PageSetLSN(page, recptr);
+ 	PageSetTLI(page, ThisTimeLineID);
+ 
+ 	END_CRIT_SECTION();
+ 
+ 	return recptr;
+ }
+ 
  static void
  heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
  {
Index: src/backend/access/nbtree/nbtsort.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/access/nbtree/nbtsort.c,v
retrieving revision 1.110
diff -c -r1.110 nbtsort.c
*** src/backend/access/nbtree/nbtsort.c	9 Jan 2007 02:14:10 -0000	1.110
--- src/backend/access/nbtree/nbtsort.c	20 Mar 2007 17:58:20 -0000
***************
*** 64,69 ****
--- 64,70 ----
  
  #include "postgres.h"
  
+ #include "access/heapam.h"
  #include "access/nbtree.h"
  #include "miscadmin.h"
  #include "storage/smgr.h"
***************
*** 265,296 ****
  	if (wstate->btws_use_wal)
  	{
  		/* We use the heap NEWPAGE record type for this */
! 		xl_heap_newpage xlrec;
! 		XLogRecPtr	recptr;
! 		XLogRecData rdata[2];
! 
! 		/* NO ELOG(ERROR) from here till newpage op is logged */
! 		START_CRIT_SECTION();
! 
! 		xlrec.node = wstate->index->rd_node;
! 		xlrec.blkno = blkno;
! 
! 		rdata[0].data = (char *) &xlrec;
! 		rdata[0].len = SizeOfHeapNewpage;
! 		rdata[0].buffer = InvalidBuffer;
! 		rdata[0].next = &(rdata[1]);
! 
! 		rdata[1].data = (char *) page;
! 		rdata[1].len = BLCKSZ;
! 		rdata[1].buffer = InvalidBuffer;
! 		rdata[1].next = NULL;
! 
! 		recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
! 
! 		PageSetLSN(page, recptr);
! 		PageSetTLI(page, ThisTimeLineID);
! 
! 		END_CRIT_SECTION();
  	}
  	else
  	{
--- 266,272 ----
  	if (wstate->btws_use_wal)
  	{
  		/* We use the heap NEWPAGE record type for this */
! 		log_newpage(&wstate->index->rd_node, blkno, page);
  	}
  	else
  	{
Index: src/backend/commands/Makefile
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/Makefile,v
retrieving revision 1.35
diff -c -r1.35 Makefile
*** src/backend/commands/Makefile	20 Jan 2007 17:16:11 -0000	1.35
--- src/backend/commands/Makefile	20 Mar 2007 17:04:58 -0000
***************
*** 16,22 ****
  	conversioncmds.o copy.o \
  	dbcommands.o define.o explain.o functioncmds.o \
  	indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
! 	portalcmds.o prepare.o proclang.o \
  	schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
  	typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o
  
--- 16,22 ----
  	conversioncmds.o copy.o \
  	dbcommands.o define.o explain.o functioncmds.o \
  	indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \
! 	rewriteheap.o portalcmds.o prepare.o proclang.o \
  	schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \
  	typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o
  
Index: src/backend/commands/cluster.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/cluster.c,v
retrieving revision 1.158
diff -c -r1.158 cluster.c
*** src/backend/commands/cluster.c	29 Mar 2007 00:15:37 -0000	1.158
--- src/backend/commands/cluster.c	29 Mar 2007 08:28:57 -0000
***************
*** 20,25 ****
--- 20,26 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/xact.h"
+ #include "access/transam.h"
  #include "catalog/catalog.h"
  #include "catalog/dependency.h"
  #include "catalog/heap.h"
***************
*** 36,41 ****
--- 37,44 ----
  #include "utils/memutils.h"
  #include "utils/syscache.h"
  #include "utils/relcache.h"
+ #include "storage/procarray.h"
+ #include "commands/rewriteheap.h"
  
  
  /*
***************
*** 653,659 ****
  	char	   *nulls;
  	IndexScanDesc scan;
  	HeapTuple	tuple;
! 	CommandId	mycid = GetCurrentCommandId();
  	bool		use_wal;
  
  	/*
--- 656,663 ----
  	char	   *nulls;
  	IndexScanDesc scan;
  	HeapTuple	tuple;
! 	TransactionId OldestXmin;
! 	RewriteState rwstate;
  	bool		use_wal;
  
  	/*
***************
*** 677,682 ****
--- 681,689 ----
  	nulls = (char *) palloc(natts * sizeof(char));
  	memset(nulls, 'n', natts * sizeof(char));
  
+ 	/* Get an OldestXmin to use to weed out dead tuples */
+ 	OldestXmin = GetOldestXmin(OldHeap->rd_rel->relisshared, false);
+ 
  	/*
  	 * We need to log the copied data in WAL iff WAL archiving is enabled AND
  	 * it's not a temp rel.  (Since we know the target relation is new and
***************
*** 688,724 ****
  	/* use_wal off requires rd_targblock be initially invalid */
  	Assert(NewHeap->rd_targblock == InvalidBlockNumber);
  
  	/*
  	 * Scan through the OldHeap on the OldIndex and copy each tuple into the
  	 * NewHeap.
  	 */
  	scan = index_beginscan(OldHeap, OldIndex,
! 						   SnapshotNow, 0, (ScanKey) NULL);
  
  	while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
  	{
  		/*
  		 * We cannot simply pass the tuple to heap_insert(), for several
  		 * reasons:
  		 *
! 		 * 1. heap_insert() will overwrite the commit-status fields of the
! 		 * tuple it's handed.  This would trash the source relation, which is
! 		 * bad news if we abort later on.  (This was a bug in releases thru
! 		 * 7.0)
! 		 *
! 		 * 2. We'd like to squeeze out the values of any dropped columns, both
  		 * to save space and to ensure we have no corner-case failures. (It's
  		 * possible for example that the new table hasn't got a TOAST table
  		 * and so is unable to store any large values of dropped cols.)
  		 *
! 		 * 3. The tuple might not even be legal for the new table; this is
  		 * currently only known to happen as an after-effect of ALTER TABLE
  		 * SET WITHOUT OIDS.
  		 *
  		 * So, we must reconstruct the tuple from component Datums.
  		 */
- 		HeapTuple	copiedTuple;
- 		int			i;
  
  		heap_deformtuple(tuple, oldTupDesc, values, nulls);
  
--- 695,743 ----
  	/* use_wal off requires rd_targblock be initially invalid */
  	Assert(NewHeap->rd_targblock == InvalidBlockNumber);
  
+ 	/* Initialize the rewrite operation */
+ 	rwstate = begin_heap_rewrite(NewHeap, OldestXmin, use_wal);
+ 
  	/*
  	 * Scan through the OldHeap on the OldIndex and copy each tuple into the
  	 * NewHeap.
  	 */
  	scan = index_beginscan(OldHeap, OldIndex,
! 						   SnapshotAny, 0, (ScanKey) NULL);
  
  	while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
  	{
+ 		HeapTuple	copiedTuple;
+ 		int			i;
+ 		bool		isdead;
+ 
+ 		LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
+ 		isdead = HeapTupleSatisfiesVacuum(tuple->t_data, 
+ 										  OldestXmin, 
+ 										  scan->xs_cbuf) == HEAPTUPLE_DEAD;
+ 		LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
+ 
+ 		if(isdead)
+ 		{
+ 			rewrite_heap_dead_tuple(rwstate, tuple);
+ 			continue;
+ 		}
+ 
  		/*
  		 * We cannot simply pass the tuple to heap_insert(), for several
  		 * reasons:
  		 *
! 		 * 1. We'd like to squeeze out the values of any dropped columns, both
  		 * to save space and to ensure we have no corner-case failures. (It's
  		 * possible for example that the new table hasn't got a TOAST table
  		 * and so is unable to store any large values of dropped cols.)
  		 *
! 		 * 2. The tuple might not even be legal for the new table; this is
  		 * currently only known to happen as an after-effect of ALTER TABLE
  		 * SET WITHOUT OIDS.
  		 *
  		 * So, we must reconstruct the tuple from component Datums.
  		 */
  
  		heap_deformtuple(tuple, oldTupDesc, values, nulls);
  
***************
*** 735,747 ****
  		if (NewHeap->rd_rel->relhasoids)
  			HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
  
! 		heap_insert(NewHeap, copiedTuple, mycid, use_wal, false);
  
  		heap_freetuple(copiedTuple);
  
  		CHECK_FOR_INTERRUPTS();
  	}
  
  	index_endscan(scan);
  
  	pfree(values);
--- 754,768 ----
  		if (NewHeap->rd_rel->relhasoids)
  			HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
  
! 		rewrite_heap_tuple(rwstate, tuple, copiedTuple);
  
  		heap_freetuple(copiedTuple);
  
  		CHECK_FOR_INTERRUPTS();
  	}
  
+ 	end_heap_rewrite(rwstate);
+ 
  	index_endscan(scan);
  
  	pfree(values);
Index: src/backend/commands/rewriteheap.c
===================================================================
RCS file: src/backend/commands/rewriteheap.c
diff -N src/backend/commands/rewriteheap.c
*** /dev/null	1 Jan 1970 00:00:00 -0000
--- src/backend/commands/rewriteheap.c	29 Mar 2007 08:31:00 -0000
***************
*** 0 ****
--- 1,543 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.c
+  *	  support functions to rewrite tables.
+  *
+  * These functions provide a facility to completely rewrite a heap, while
+  * preserving visibility information and update chains. 
+  *
+  *
+  * INTERFACE
+  *
+  * The caller is responsible for creating the new heap, all catalog 
+  * changes, supplying the tuples to be written to the new heap, and 
+  * rebuilding indexes. The caller must hold an AccessExclusiveLock on the 
+  * table.
+  *
+  * To use the facility:
+  *
+  * begin_heap_rewrite
+  * while(fetch next tuple)
+  * {
+  *     if(tuple is dead)
+  *         rewrite_heap_dead_tuple
+  *     else
+  *     {
+  *         do any transformations here if required
+  *         rewrite_heap_tuple
+  *     }
+  * }
+  * end_heap_rewrite
+  *
+  * The rewrite facility holds the current page it's inserting into pinned
+  * and locked over rewrite_heap_tuple calls. To avoid deadlock, the caller
+  * must not access the new relation while a rewrite is in progress.
+  *
+  * The contents of the new relation shouldn't be relied on until 
+  * end_heap_rewrite is called.
+  *
+  *
+  * IMPLEMENTATION
+  *
+  * While we copy the tuples, we need to keep track of ctid-references to
+  * handle update chains correctly. Since the new versions of tuples will 
+  * have different ctids than the original ones, if we just copied 
+  * everything to the new table the ctid pointers would be wrong. For each 
+  * ctid reference from A -> B, we can encounter either A first or B first:
+  *
+  * If we encounter A first, we'll store the tuple in the unresolved_ctids
+  * hash table. When we later encounter B, we remove A from the hash table,
+  * fix the ctid to point to the new location of B, and insert both A and B
+  * to the new heap.
+  *
+  * If we encounter B first, we'll put an entry in the old_new_tid_map hash
+  * table and insert B to the new heap right away. When we later encounter 
+  * A, we get the new location of B from the table.
+  *
+  * Note that a tuple in the middle of a chain is both A and B at the same
+  * time. Entries in the hash tables can be removed as soon as the later
+  * tuple is encountered. That helps to keep the memory usage down. At the 
+  * end, both tables are usually empty; we should have encountered both A 
+  * and B for each ctid reference. However, if there's a chain like this:
+  * RECENTLY_DEAD -> DEAD -> LIVE as determined by HeapTupleSatisfiesVacuum,
+  * we can encounter the dead tuple first, and skip it, and bump into the
+  * RECENTLY_DEAD tuple later. The RECENTLY_DEAD tuple would be added
+  * to unresolved_ctids, and stay there until end of the rewrite.
+  *
+  * Using in-memory hash tables means that we use some memory for each live
+  * update chain in the table, from the time we find one end of the
+  * reference until we find the other end. That shouldn't be a problem in
+  * practice, but if you do something like an UPDATE without a where-clause
+  * on a large table, and then run CLUSTER in the same transaction, you
+  * might run out of memory. It doesn't seem worthwhile to add support to
+  * spill-to-disk, there shouldn't be that many RECENTLY_DEAD tuples in a
+  * table under normal circumstances, and even if we do fail halfway through
+  * a CLUSTER, the old table is still valid.
+  *
+  * We can't use the normal heap_insert function to insert into the new
+  * heap, because heap_insert overwrites the visibility information.
+  * We use a special-purpose raw_heap_insert function instead, which
+  * is optimized for bulk inserting a lot of tuples, knowing that we have
+  * exclusive access to the heap. raw_heap_insert keeps the current buffer
+  * we're inserting into locked over calls, until it gets full. The page 
+  * is inserted to WAL as a single record.
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/heapam.h"
+ #include "access/transam.h"
+ #include "access/tuptoaster.h"
+ #include "commands/rewriteheap.h"
+ #include "utils/memutils.h"
+ 
+ /*
+  * State associated with a rewrite operation. This is opaque to the user
+  * of the rewrite facility.
+  */
+ typedef struct RewriteStateData
+ {
+ 	Relation		rs_new_rel;			/* new heap */
+ 	bool			rs_use_wal;			/* WAL-log inserts to new heap? */
+ 	MemoryContext	rs_cxt;				/* for hash tables and entries and 
+ 										 * tuples in them */
+ 	TransactionId	rs_oldest_xmin;		/* oldest xmin used by caller to
+ 										 * determine tuple visibility */
+ 	Buffer			rs_buf;				/* current buffer we're inserting
+ 										 * to, exclusively locked */
+ 	HTAB		   *rs_unresolved_ctids;
+ 	HTAB		   *rs_old_new_tid_map;
+ } RewriteStateData;
+ 
+ /* entry structures for the hash tables */
+ 
+ typedef struct UnresolvedCtidData {
+ 	ItemPointerData old_ctid;	/* t_ctid-pointer of the tuple in old heap */
+ 	ItemPointerData old_tid;	/* location in the old heap */
+ 	HeapTuple tuple;			/* tuple contents */
+ } UnresolvedCtidData;
+ 
+ typedef UnresolvedCtidData *UnresolvedCtid;
+ 
+ typedef struct OldToNewMappingData {
+ 	ItemPointerData old_tid;	/* location in the old heap */
+ 	ItemPointerData new_tid;	/* location in the new heap */
+ } OldToNewMappingData;
+ 
+ typedef OldToNewMappingData *OldToNewMapping;
+ 
+ 
+ /* prototypes for internal functions */
+ static void raw_heap_insert(RewriteState state, HeapTuple tup);
+ 
+ 
+ /*
+  * Begin a rewrite of a table
+  *
+  * new_heap		new, empty heap relation to insert tuples to
+  * oldest_xmin	xid used by the caller to determine which tuples
+  *				are dead.
+  * use_wal		should the inserts to the new heap be WAL-logged
+  *
+  * Returns an opaque RewriteState, allocated in current memory context,
+  * to be used in subseuqent calls to the other functions.
+  */
+ RewriteState
+ begin_heap_rewrite(Relation new_heap, TransactionId oldest_xmin,
+ 				   bool use_wal)
+ {
+ 	RewriteState state;
+ 	HASHCTL		hash_ctl;
+ 
+ 	state = palloc(sizeof(RewriteStateData));
+ 	state->rs_cxt = AllocSetContextCreate(CurrentMemoryContext,
+ 										  "Table rewrite",
+ 										  ALLOCSET_DEFAULT_MINSIZE,
+ 										  ALLOCSET_DEFAULT_INITSIZE,
+ 										  ALLOCSET_DEFAULT_MAXSIZE);
+ 
+ 	state->rs_oldest_xmin = oldest_xmin;
+ 	state->rs_new_rel = new_heap;
+ 	state->rs_use_wal = use_wal;
+ 	state->rs_buf = InvalidBuffer;
+ 
+ 	/* Initialize hash tables used to track update chains */
+ 	memset(&hash_ctl, 0, sizeof(hash_ctl));
+ 	hash_ctl.keysize = sizeof(ItemPointerData);
+ 	hash_ctl.entrysize = sizeof(UnresolvedCtidData);
+ 	hash_ctl.hcxt = state->rs_cxt;
+ 	hash_ctl.hash = tag_hash;
+ 
+ 	state->rs_unresolved_ctids =
+ 		hash_create("Rewrite / Unresolved ctids",
+ 					128, /* arbitrary initial size */
+ 					&hash_ctl,
+ 					HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+ 
+ 	hash_ctl.entrysize = sizeof(OldToNewMappingData);
+ 	
+ 	state->rs_old_new_tid_map =
+ 		hash_create("Rewrite / Old to new tid map",
+ 					128, /* arbitrary initial size */
+ 					&hash_ctl,
+ 					HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+ 
+ 	return state;
+ }
+ 
+ /*
+  * End a rewrite.
+  *
+  * state and any other resources are freed.
+  */
+ void
+ end_heap_rewrite(RewriteState state)
+ {
+ 	HASH_SEQ_STATUS seq_status;
+ 	UnresolvedCtid unresolved;
+ 
+ 	/* Write any remaining tuples in the unresolvedCtids table. If we have
+ 	 * any left, they should in fact be dead, but let's err on the safe 
+ 	 * side.
+ 	 */
+ 	hash_seq_init(&seq_status, state->rs_unresolved_ctids);
+ 
+ 	while((unresolved = hash_seq_search(&seq_status)) != NULL)
+ 	{
+ 		ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+ 		raw_heap_insert(state, unresolved->tuple);
+ 
+ 		/* don't bother removing and pfreeing the entry, we're going
+ 		 * to free the whole memory context after the scan anyway
+ 		 */
+ 	}
+ 	
+ 	/* WAL-log and unlock the last page */
+ 	if(BufferIsValid(state->rs_buf))
+ 	{
+ 		if(state->rs_use_wal)
+ 			log_newpage(&state->rs_new_rel->rd_node,
+ 						BufferGetBlockNumber(state->rs_buf),
+ 						BufferGetPage(state->rs_buf));
+ 
+ 		MarkBufferDirty(state->rs_buf);
+ 
+ 		UnlockReleaseBuffer(state->rs_buf);
+ 		state->rs_buf = InvalidBuffer;
+ 	}
+ 
+ 	MemoryContextDelete(state->rs_cxt);
+ 	pfree(state);
+ }
+ 
+ /*
+  * Register a dead tuple with an ongoing rewrite. Dead tuples are not
+  * copied to the new table, but we still make note of them so that we
+  * can release some resources earlier.
+  */
+ void
+ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
+ {
+ 	/* If we have already seen an earlier tuple in the update chain that
+ 	 * points to this tuple, let's forget about that earlier tuple. It's
+ 	 * in fact dead as well, our simple xmax < OldestXmin test in 
+ 	 * HeapTupleSatisfiesVacuum just wasn't enough to detect it. It
+ 	 * happens when xmin of a tuple is greater than xmax, which sounds
+ 	 * counter-intuitive but is perfectly valid.
+ 	 *
+ 	 * We don't bother to try to detect the situation the other way
+ 	 * round, when we encounter the dead tuple first and then the
+ 	 * recently dead one that points to it. If that happens, we'll
+ 	 * have some unmatched entries in the unresolvedCtids hash table
+ 	 * at the end. That can happen anyway, because a vacuum might
+ 	 * have removed the dead tuple in the chain before us.
+ 	 */
+ 	hash_search(state->rs_unresolved_ctids,
+ 				(void *) &(old_tuple->t_self),
+ 				HASH_REMOVE, NULL);
+ }
+ 
+ /*
+  * Add a tuple to the new heap.
+  *
+  * Visibility information is copied from the original tuple.
+  *
+  * state		opaque state as returned by begin_heap_rewrite
+  * old_tuple	original tuple in the old heap
+  * new_tuple	new, rewritten tuple to be inserted to new heap
+  */
+ void
+ rewrite_heap_tuple(RewriteState state,
+ 				   HeapTuple old_tuple, HeapTuple new_tuple)
+ {
+ 	MemoryContext saved_cxt;
+ 	ItemPointerData old_tid;
+ 
+ 	saved_cxt = MemoryContextSwitchTo(state->rs_cxt);
+ 
+ 	/* Preserve the visibility information. Also preserve the HEAP_UPDATED
+ 	 * flag; if the original tuple was an updated row version, so is the 
+ 	 * new one.
+ 	 */
+ 	memcpy(&new_tuple->t_data->t_choice.t_heap,
+ 		   &old_tuple->t_data->t_choice.t_heap,
+ 		   sizeof(HeapTupleFields));
+ 
+ 	new_tuple->t_data->t_infomask &= ~(HEAP_XACT_MASK | HEAP_UPDATED);
+ 	new_tuple->t_data->t_infomask |=
+ 		old_tuple->t_data->t_infomask & (HEAP_XACT_MASK | HEAP_UPDATED);
+ 
+ 	/* Invalid ctid means that ctid should point to the tuple itself.
+ 	 * We'll override it below if the tuple is part of an update chain.
+ 	 */
+ 	ItemPointerSetInvalid(&new_tuple->t_data->t_ctid);
+ 
+ 	/* When we encounter a tuple that has been updated, check
+ 	 * the old-to-new mapping hash table. If a match is found there,
+ 	 * we've already copied the tuple that t_ctid points to, so we
+ 	 * set the ctid of this tuple point to the new location, and insert
+ 	 * the new tuple. Otherwise we store the tuple in the unresolved_ctids
+ 	 * hash table and deal with it later, after inserting the tuple t_ctid
+ 	 * points to.
+ 	 */
+ 	if (!(old_tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
+ 									   HEAP_IS_LOCKED)) &&
+ 		!(ItemPointerEquals(&(old_tuple->t_self),
+ 							&(old_tuple->t_data->t_ctid))))
+ 	{
+ 		OldToNewMapping mapping;
+ 
+ 		mapping = (OldToNewMapping)
+ 			hash_search(state->rs_old_new_tid_map,
+ 						(void *) &(old_tuple->t_data->t_ctid),
+ 						HASH_FIND, NULL);
+ 
+ 		if (mapping != NULL)
+ 		{
+ 			/* We've already copied the tuple t_ctid points to.
+ 			 * Use the new tid of that tuple */
+ 			new_tuple->t_data->t_ctid = mapping->new_tid;
+ 
+ 			hash_search(state->rs_old_new_tid_map,
+ 						(void *) &(old_tuple->t_data->t_ctid),
+ 						HASH_REMOVE, NULL);
+ 		}
+ 		else
+ 		{
+ 			/* We haven't seen the tuple t_ctid points to yet. Stash
+ 			 * it into unresolved_ctids table and deal with it later.
+ 			 */
+ 			UnresolvedCtid unresolved;
+ 			bool found;
+ 
+ 			unresolved = hash_search(state->rs_unresolved_ctids,
+ 									 (void *) &(old_tuple->t_data->t_ctid),
+ 									 HASH_ENTER, &found);
+ 			Assert(!found);
+ 
+ 			unresolved->old_tid = old_tuple->t_self;
+ 			unresolved->tuple = heap_copytuple(new_tuple);
+ 
+ 			MemoryContextSwitchTo(saved_cxt);
+ 			return;
+ 		}
+ 	}
+ 
+ 	old_tid = old_tuple->t_self;
+ 
+ 	for(;;)
+ 	{
+ 		ItemPointerData new_tid;
+ 
+ 		/* Insert the tuple */
+ 		raw_heap_insert(state, new_tuple);
+ 		new_tid = new_tuple->t_self;
+ 
+ 		/*
+ 		 * When we encounter a tuple that's an updated version
+ 		 * of a row, check if the previous tuple in the update chain
+ 		 * is still visible to someone. The previous tuple's xmax
+ 		 * is equal to this tuples xmin, so we can do the same check
+ 		 * HeapTupleSatisfiesVacuum would do for the previous tuple,
+ 		 * using xmin of this tuple. We know that xmin of this tuple
+ 		 * committed, otherwise this tuple would be dead and the caller
+ 		 * wouldn't have passed it to us in the first place.
+ 		 */
+ 		if ((new_tuple->t_data->t_infomask & HEAP_UPDATED) &&
+ 			!TransactionIdPrecedes(HeapTupleHeaderGetXmin(new_tuple->t_data),
+ 								   state->rs_oldest_xmin))
+ 		{
+ 			/* The previous tuple in the update chain is RECENTLY_DEAD.
+ 			 * Let's see if we've seen it already. */
+ 			UnresolvedCtid unresolved;
+ 
+ 			unresolved = hash_search(state->rs_unresolved_ctids,
+ 									 (void *) &old_tid,
+ 									 HASH_FIND, NULL);
+ 
+ 			if (unresolved != NULL)
+ 			{
+ 				/* We have seen and memorized the previous tuple already. 
+ 				 * Now that we know where we inserted the tuple its t_ctid 
+ 				 * points to, fix its t_ctid and insert it to the new heap.
+ 				 */
+ 				HeapTuple		prev_tup	 = unresolved->tuple;
+ 				ItemPointerData prev_old_tid = unresolved->old_tid;
+ 
+ 				prev_tup->t_data->t_ctid = new_tid;
+ 
+ 				hash_search(state->rs_unresolved_ctids,
+ 							(void *) &old_tid,
+ 							HASH_REMOVE, NULL);
+ 
+ 				/* loop back to insert the previous tuple in the chain */
+ 				old_tid = prev_old_tid;
+ 				new_tuple = prev_tup;
+ 				continue;
+ 			}
+ 			else
+ 			{
+ 				/* Remember the new tid of this tuple. We'll use it to set
+ 				 * the ctid when we find the previous tuple in the chain
+ 				 */
+ 				OldToNewMapping mapping;
+ 				bool found;
+ 
+ 				mapping = hash_search(state->rs_old_new_tid_map,
+ 									  (void *) &old_tid,
+ 									  HASH_ENTER, &found);
+ 				Assert(!found);
+ 
+ 				mapping->new_tid = new_tid;
+ 			}
+ 		}
+ 		break;
+ 	}
+ 
+ 	MemoryContextSwitchTo(saved_cxt);
+ }
+ 
+ /*
+  * Insert a tuple to the new relation.
+  *
+  * t_self of the tuple is set to the new TID of the tuple. If t_ctid of the
+  * tuple is invalid on entry, it's replaced with the new TID as well.
+  */
+ static void
+ raw_heap_insert(RewriteState state, HeapTuple new_tup)
+ {
+ 	BlockNumber		blkno	= InvalidBlockNumber;
+ 	Page			page	= NULL;
+ 	Size			pageFreeSpace, saveFreeSpace;
+ 	Size			len;
+ 	OffsetNumber	newoff;
+ 	HeapTuple		tup;
+ 
+ 	/*
+ 	 * If the new tuple is too big for storage or contains already toasted
+ 	 * out-of-line attributes from some other relation, invoke the toaster.
+ 	 *
+ 	 * Note: below this point, tup is the data we actually intend to store
+ 	 * into the relation; new_tup is the caller's original untoasted data.
+ 	 */
+ 	if (HeapTupleHasExternal(new_tup) || new_tup->t_len > TOAST_TUPLE_THRESHOLD)
+ 		tup = toast_insert_or_update(state->rs_new_rel, new_tup, NULL,
+ 									 state->rs_use_wal, false);
+ 	else
+ 		tup = new_tup;
+ 
+ 	len = MAXALIGN(tup->t_len);		/* be conservative */
+ 
+ 	/*
+ 	 * If we're gonna fail for oversize tuple, do it right away
+ 	 */
+ 	if (len > MaxHeapTupleSize)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ 				 errmsg("row is too big: size %lu, maximum size %lu",
+ 						(unsigned long) len,
+ 						(unsigned long) MaxHeapTupleSize)));
+ 
+ 	/* Compute desired extra freespace due to fillfactor option */
+ 	saveFreeSpace = RelationGetTargetPageFreeSpace(state->rs_new_rel,
+ 												   HEAP_DEFAULT_FILLFACTOR);
+ 
+ 	/* Now we can check to see if there's enough free space here. */
+ 	if (BufferIsValid(state->rs_buf))
+ 	{
+ 		blkno = BufferGetBlockNumber(state->rs_buf);
+ 		page = (Page) BufferGetPage(state->rs_buf);
+ 		pageFreeSpace = PageGetFreeSpace(page);
+ 	
+ 		if (len + saveFreeSpace > pageFreeSpace)
+ 		{
+ 			if(state->rs_use_wal)
+ 				log_newpage(&state->rs_new_rel->rd_node, blkno, page);
+ 			
+ 			MarkBufferDirty(state->rs_buf);
+ 
+ 			UnlockReleaseBuffer(state->rs_buf);
+ 			state->rs_buf = InvalidBuffer;
+ 		}
+ 	}
+ 
+ 	if (!BufferIsValid(state->rs_buf))
+ 	{
+ 		/* There's no current buffer to insert to. Extend relation to get
+ 		 * a new page.
+ 		 */
+ 		state->rs_buf = ReadBuffer(state->rs_new_rel, P_NEW);
+ 		blkno = BufferGetBlockNumber(state->rs_buf);
+ 		page = (Page) BufferGetPage(state->rs_buf);
+ 
+ 		LockBuffer(state->rs_buf, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 		/*
+ 		 * We need to initialize the empty new page.  Double-check that it 
+ 		 * really is empty (this should never happen, but if it does we
+ 		 * don't want to risk wiping out valid data).
+ 		 */
+ 
+ 		if (!PageIsNew((PageHeader) page))
+ 			elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
+ 				 BufferGetBlockNumber(state->rs_buf),
+ 				 RelationGetRelationName(state->rs_new_rel));
+ 
+ 		PageInit(page, BufferGetPageSize(state->rs_buf), 0);
+ 	}
+ 
+ 	/* We now have a page pinned and locked to insert the new tuple to */
+ 
+ 	newoff = PageAddItem(page, (Item) tup->t_data, len, 
+ 						 InvalidOffsetNumber, LP_USED);
+ 	if (newoff == InvalidOffsetNumber)
+ 		elog(ERROR, "failed to add tuple");
+ 
+ 	/* Update t_self to the actual position where it was stored */
+ 	ItemPointerSet(&(new_tup->t_self), blkno, newoff);
+ 
+ 	/* Insert the correct position into CTID of the stored tuple, too,
+ 	 * if the caller didn't supply a valid CTID.
+ 	 */
+ 	if(!ItemPointerIsValid(&new_tup->t_data->t_ctid))
+ 	{
+ 		ItemId			newitemid;
+ 		HeapTupleHeader onpage_tup;
+ 
+ 		newitemid = PageGetItemId(page, newoff);
+ 		onpage_tup = (HeapTupleHeader) PageGetItem(page, newitemid);
+ 
+ 		onpage_tup->t_ctid = new_tup->t_self;
+ 	}
+ 
+ 	/* If tup is a private copy, release it. */
+ 	if (tup != new_tup)
+ 		heap_freetuple(tup);
+ }
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.218
diff -c -r1.218 tablecmds.c
*** src/backend/commands/tablecmds.c	19 Mar 2007 23:38:29 -0000	1.218
--- src/backend/commands/tablecmds.c	20 Mar 2007 18:59:08 -0000
***************
*** 5855,5890 ****
  	{
  		smgrread(src, blkno, buf);
  
- 		/* XLOG stuff */
  		if (use_wal)
! 		{
! 			xl_heap_newpage xlrec;
! 			XLogRecPtr	recptr;
! 			XLogRecData rdata[2];
! 
! 			/* NO ELOG(ERROR) from here till newpage op is logged */
! 			START_CRIT_SECTION();
! 
! 			xlrec.node = dst->smgr_rnode;
! 			xlrec.blkno = blkno;
! 
! 			rdata[0].data = (char *) &xlrec;
! 			rdata[0].len = SizeOfHeapNewpage;
! 			rdata[0].buffer = InvalidBuffer;
! 			rdata[0].next = &(rdata[1]);
! 
! 			rdata[1].data = (char *) page;
! 			rdata[1].len = BLCKSZ;
! 			rdata[1].buffer = InvalidBuffer;
! 			rdata[1].next = NULL;
! 
! 			recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_NEWPAGE, rdata);
! 
! 			PageSetLSN(page, recptr);
! 			PageSetTLI(page, ThisTimeLineID);
! 
! 			END_CRIT_SECTION();
! 		}
  
  		/*
  		 * Now write the page.	We say isTemp = true even if it's not a temp
--- 5855,5862 ----
  	{
  		smgrread(src, blkno, buf);
  
  		if (use_wal)
! 			log_newpage(&dst->smgr_rnode, blkno, page);
  
  		/*
  		 * Now write the page.	We say isTemp = true even if it's not a temp
Index: src/include/access/heapam.h
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/include/access/heapam.h,v
retrieving revision 1.121
diff -c -r1.121 heapam.h
*** src/include/access/heapam.h	29 Mar 2007 00:15:39 -0000	1.121
--- src/include/access/heapam.h	29 Mar 2007 08:27:34 -0000
***************
*** 194,199 ****
--- 194,200 ----
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
  								  TransactionId cutoff_xid,
  								  OffsetNumber *offsets, int offcnt);
+ extern XLogRecPtr log_newpage(RelFileNode *rnode, BlockNumber blk, Page page);
  
  /* in common/heaptuple.c */
  extern Size heap_compute_data_size(TupleDesc tupleDesc,
Index: src/include/commands/rewriteheap.h
===================================================================
RCS file: src/include/commands/rewriteheap.h
diff -N src/include/commands/rewriteheap.h
*** /dev/null	1 Jan 1970 00:00:00 -0000
--- src/include/commands/rewriteheap.h	20 Mar 2007 17:52:15 -0000
***************
*** 0 ****
--- 1,29 ----
+ /*-------------------------------------------------------------------------
+  *
+  * rewriteheap.h
+  *	  header file for heap rewrite support functions
+  *
+  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994-5, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef REWRITE_HEAP_H
+ #define REWRITE_HEAP_H
+ 
+ #include "access/htup.h"
+ #include "utils/rel.h"
+ 
+ /* definition is private to rewriteheap.c */
+ typedef struct RewriteStateData *RewriteState;
+ 
+ extern RewriteState begin_heap_rewrite(Relation NewHeap, 
+ 									   TransactionId OldestXmin, bool use_wal);
+ extern void end_heap_rewrite(RewriteState state);
+ extern void rewrite_heap_dead_tuple(RewriteState state, HeapTuple oldTuple);
+ extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple, 
+ 							   HeapTuple newTuple);
+ 
+ #endif   /* REWRITE_H */
Index: src/test/regress/expected/cluster.out
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/expected/cluster.out,v
retrieving revision 1.17
diff -c -r1.17 cluster.out
*** src/test/regress/expected/cluster.out	7 Jul 2005 20:40:01 -0000	1.17
--- src/test/regress/expected/cluster.out	20 Mar 2007 16:50:04 -0000
***************
*** 382,389 ****
--- 382,428 ----
   2
  (2 rows)
  
+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "clustertest_pkey" for table "clustertest"
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+ -- Test longer update chain 
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+ SELECT * FROM clustertest;
+  key 
+ -----
+   20
+   30
+  100
+   35
+   70
+ (5 rows)
+ 
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+  key 
+ -----
+   20
+   30
+   35
+   70
+  100
+ (5 rows)
+ 
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;
Index: src/test/regress/sql/cluster.sql
===================================================================
RCS file: /home/hlinnaka/pgcvsrepository/pgsql/src/test/regress/sql/cluster.sql,v
retrieving revision 1.9
diff -c -r1.9 cluster.sql
*** src/test/regress/sql/cluster.sql	7 Jul 2005 20:40:02 -0000	1.9
--- src/test/regress/sql/cluster.sql	20 Mar 2007 17:52:48 -0000
***************
*** 153,160 ****
--- 153,187 ----
  CLUSTER clstr_1;
  SELECT * FROM clstr_1;
  
+ -- Test MVCC-safety of cluster. There isn't much we can do to verify the
+ -- results with a single backend...
+ 
+ CREATE TABLE clustertest (key int PRIMARY KEY);
+ 
+ INSERT INTO clustertest VALUES (10);
+ INSERT INTO clustertest VALUES (20);
+ INSERT INTO clustertest VALUES (30);
+ INSERT INTO clustertest VALUES (40);
+ INSERT INTO clustertest VALUES (50);
+ 
+ -- Test update where the old row version is found first in the scan
+ UPDATE clustertest SET key = 100 WHERE key = 10;
+ 
+ -- Test update where the new row version is found first in the scan
+ UPDATE clustertest SET key = 35 WHERE key = 40;
+ 
+ -- Test longer update chain 
+ UPDATE clustertest SET key = 60 WHERE key = 50;
+ UPDATE clustertest SET key = 70 WHERE key = 60;
+ UPDATE clustertest SET key = 80 WHERE key = 90;
+ 
+ SELECT * FROM clustertest;
+ CLUSTER clustertest_pkey ON clustertest;
+ SELECT * FROM clustertest;
+ 
  -- clean up
  \c -
+ DROP TABLE clustertest;
  DROP TABLE clstr_1;
  DROP TABLE clstr_2;
  DROP TABLE clstr_3;
